社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  docker

【第1267期】基于Docker+Consul+Registrator+Nodejs实现服务治理(二)

前端早读课 • 6 年前 • 958 次点击  

前言

昨日看到很多人的评论,是值得我去思考的。今日早读文章由陆金所@枫叶投稿分享。

前言

基于上一篇【第1266期】基于Docker+Consul+Registrator+Nodejs实现服务治理(一)我们已经实现服务注册,本篇文章我们着重实现服务发现。

服务发现


services register

根据上篇的流程图,服务发现功能包括:

  • 服务订阅

  • 动态获取服务列表

  • 本地缓存

  • 缓存服务路由表

  • 服务调用

  • 服务请求的负载均衡策略

  • 变更通知

  • 监听服务节点变化

  • 更新服务路由表

示例

为了验证服务发现机制,api gateway对外提供getRemoteIp方法,用来获取service web的本地ip地址,获取service web的ip的作用有两个:

  1. 成功返回ip地址,则验证服务发现起作用

  2. 多次请求返回不同的ip地址,则验证api gateway 负载均衡起作用

先看示例部署图如下:


services register

registratior监控service web,一旦service web 状态发生变化,通知consul cluster做出相应处理,api gateway 订阅consul cluster 的服务,根据负载均衡的策略,把请求转发到对应web处理。

获取service web本地ip的时序图如下:

getRemoteIp

下面我们开始实现我们的功能。

源码代码地址与目录

源码地址: gateway-test:https://github.com/chenchunyong/gateway-test

目录如下:

app.js------ app启动入口,
discovery
.js------ 服务发现
router
.js------ 暴露getRemoteIp方法
serviceLocalStorage
.js------ 缓存�服务地址
watch
.js------ 监控注册中心的service 是否发生变化
startWatch
.js------ 启动监控,如果发生变化,则通知缓存更新service列表
Dockerfile ------ 制作docker image
docker
-compose.yml------ 服务编排

具体功能实现

下面会对上面提供的功能点依次进行实现(展示代码中只保留核心代码,详细请见代码)。

1. 服务发现,缓存服务地址

服务发现discovery.js,代码如下:

classDiscovery {
   connect
(...args) {
       
if (!this.consul) {
           debug
(`与consul server连接中...`);
           
//建立连接,//需要注意的时,由于需要动态获取docker内的consul server的地址,//所以host需要配置为consulserver(来自docker-compose配置的consulserver)//发起请求时会经过docker内置的dns server,即可把consulserver替换为具体的consul 服务器 ipthis.consul=newConsul({
               host
:'consulserver',
               
...args,
               promisify
:utils.fromCallback//转化为promise类型
           
});
       
}
       returnthis
;
   
}
   
/**     * 根据名称获取服务     * @param{*}opts*/asyncgetService(opts) {
       
if (!this.consul) {
           thrownewError
('请先用connect方法进行连接');
       
}
       
const {service} = opts;
       
// 从缓存中获取列表constservices=serviceLocalStorage.getItem(service);
       
if (services.length>0) {
           debug
(`命中缓存,key:${service},value:${JSON.stringify(services)}`);
           
return services;
       
}
       
//如果缓存不存在,则获取远程数据let result =awaitthis
           
.consul
           
.catalog
           
.service
           
.nodes(opts);
       debug
(`获取服务端数据,key:${service}:value:${JSON.stringify(result[0])}`);
       serviceLocalStorage
.setItem(service, result[0])
       
return result[0];
   
}
}

调用getService获取注册服务信息,步骤如下:

  1. 检查缓存中是否存在,如果存在,则从缓存中获取

  2. 如果不存在,则获取最新的服务列表

  3. 把从服务端的服务列表,存储到缓存中

2. 变更通知 && 更新本地服务列表

监听服务节点,一旦发生变化,立即通知对应的订阅者,更新本地服务列表。

监听服务节点watch.js代码:

classWatch {
   
/**     * 监控需要的服务     * @param{*}services     * @param{*}onChanged*/watch(services, onChanged) {
       constconsul
=this.consul;
       
if (services ===undefined) {
           thrownewError
('service 不能为空')
       
}
       
if (typeof services ==='string') {
           serviceWatch
(services);
       
} elseif (services instanceofArray) {
           services
.forEach(service=> {
               serviceWatch
(service);
           
});
       
}
       
// 监听服务核心代码functionserviceWatch(service) {
           constwatch
=consul.watch({method:consul.catalog.service.nodes, options: {
                   service                
}});
           
// 监听服务如果发现,则触发回调方法watch.on('change', data=> {
               constresult
= {
                   name
: service,
                   data                
};
               debug
(`监听${service}内容有变化:${JSON.stringify(result)}`);
               onChanged
(null, result);
           
});
           watch
.on('error', error=> {
               debug
(`监听${service}错误,错误的内容为:${error}`);
               onChanged
(error, null);
           
});
       
}
       returnthis
;
   
}
}

由于nodejs是单线程的,需要额外启动一个子进程来监听服务的变化,一旦服务列表有变化,则把服务列表更新到缓存中,请看app.js代码:

constApplication=require('koa');
constapp
=newApplication();
constdebug
=require('debug');
constappDebug
=debug('dev:app');
constforkDebug
=debug('dev:workerProcess');
constchild_process
=require('child_process');
constrouter
=require('./router');
constserviceLocalStorage
=require('./serviceLocalStorage.js');
//监听3000端口app.listen(3000, '0.0.0.0',() => {
   appDebug
('Server running at 3000');
});
app    
.use(router.routes())
   
.use (router.allowedMethods);
// fork一个子进程,用于监听服务节点变化constworkerProcess=child_process.fork('./startWatch.js');
// 子进程退出workerProcess.on('exit', function (code) {
   forkDebug
(`子进程已退出,退出码:${code}`);
});
workerProcess
.on('error', function (error) {
   forkDebug
(`error: ${error}`);
});
// 接收变化的服务列表,并更新到缓存中workerProcess.on('message', msg=> {
   
if (msg) {
       appDebug
(`从监控中数据变化:${JSON.stringify(msg)}`);
       
//更新缓存中服务列表serviceLocalStorage.setItem(msg.name, msg.data);
   
}
});

发送变更的服务列表给主进程,涉及到的代码startWatch.js:

constwatch=require('./watch');
// 监听服务节点,如果发现变化,则通知主进程的服务列表进行更新watch.connect().watch(['service-web'],(error,data)=>{
  process
.send(data);
});
3. 服务调用

结合前面说的服务发现,我们来看服务是如何被调用的,涉及到的代码router.js

router.get('/service-web/getRemoteIp', async(ctx, next) 


    
=> {
   
//获取具体ip信息consthost=awaitgetServiceHost('service-web');
   constfetchUrl
=`http://${host}/getRemoteIp`;
   
// 获取到具体服务的ip信息constresult=awaitrequest.get(fetchUrl);
   debug
(`getRemoteIp:${result.text}`);
   ctx
.body=result.text;
});
/** * 根据service name 获取 service 对应host*/asyncfunctiongetServiceHost(name) {
   
//根据服务名称获取注册的服务信息,如果缓存中存在,则从缓存中获取,如果不存在则获取数据constservices=awaitdiscovery.getService({service: name});
   random
=Math.floor(Math.random() * (services.length));
   
//定义随机数,随机获取ip的负载均衡策略consthost= services[random];
   debug
(`service host ${services[random]}`)
   
return host;
}

部署服务

根据前面提供部署图,我们来实现服务编排,
完整的docker-compose.yml代码如下:

version: '3.0'
services
:
 
# consul server,对外暴露的ui接口为8500,只有在2台consul服务器的情况下集群才起作用
 consulserver
:
   image
: progrium/consul:latest
   hostname
: consulserver
   ports
:
     
- "8300"
     
- "8400"
     
- "8500:8500"
     
- "53"
   command
: -server -ui-dir /ui -data-dir /tmp/consul --bootstrap-expect=2
   networks
:
     
- app
 
# consul server1在consul server服务起来后,加入集群中
 consulserver1
:
   image
: progrium/consul:latest
   hostname
: consulserver1
   depends_on
:
     
- "consulserver"
   ports
:
     
- "8300"
     
- "8400"
     
- "8500"
     
- "53"
   command
: -server -data-dir /tmp/consul -join consulserver
   networks
:
     
- app
 
# consul server2在consul server服务起来后,加入集群中
 consulserver2
:
   image
: progrium/consul:latest
   hostname
: consulserver2
   depends_on
:
     
- "consulserver"
   ports
:
     
- "8300"
     
- "8400"
     
- "8500"
     
- "53"
   command
: -server -data-dir /tmp/consul -join consulserver
   networks
:
     
- app
 
# 监听容器中暴露的端口,一定有新的端口,注册到注册中心
 registrator
:
   image
: gliderlabs/registrator:master
   hostname
: registrator
   depends_on
:
     
- "consulserver"
   volumes
:
     
- "/var/run/docker.sock:/tmp/docker.sock"
   command
: -internal consul://consulserver:8500
   networks
:
     
- app
 serviceweb
:
   image
: windavid/node-service-test-web
   depends_on
:
     
- "consulserver"
   environment
:
     SERVICE_3000_NAME
: service-web
   ports
:
     
- "3000"
   networks
:
     
- app
 
# app gatway 暴露对外访问3000端口
 gateway
:
   image
: windavid/gateway-test
   hostname
: gateway
   ports
:
     
- "3000:3000"
   networks
:
     
- app
networks
:
 app
:

运行以下命令,启动服务:

docker-compose up -d --scale serviceweb=3

验证服务功能

启动服务后,我们在注册中心发现service-web 对应的服务器ip分别为:
172.21.0.6,172.21.0.7,172.21.0.9。

1. 验证服务发现

如前面时序图所述,api gateway对外暴露/service-web/getRemoteIp来获取本地ip信息。

多次运行以下代码来验证结果:

curl http://127.0.0.1:3000/service-web/getRemoteIp

多次运行的结果为以下随机一个:

{"ip":"172.21.0.6"},{"ip":"172.21.0.7"},{"ip":"172.21.0.9"}

说明服务发现以及负载均衡的功能验证通过。

2. 验证服务通知功能

下线某个service-web服务后,查看gateway-test日志:

dev:watch 监听service-web内容有变化:{"name":


    
"service-web","data":[{"Node":"consulserver","Address":"172.21.0.2","ServiceID":"registrator:gatewaytest_serviceweb_1:3000","ServiceName":"service-web","ServiceTags":null,"ServiceAddress":"172.21.0.7","ServicePort":3000},{"Node":"consulserver","Address":"172.21.0.2","ServiceID":"registrator:gatewaytest_serviceweb_3:3000","ServiceName":"service-web","ServiceTags":null,"ServiceAddress":"172.21.0.6","ServicePort":3000}]}

发现ip为172.21.0.9的服务已经下线,且通知到订阅者。

总结 && 参考

到此为止,我们已实现了服务的注册与发现。下篇我们介绍分布式服务中的服务跟踪。

参考:

https://github.com/jasonGeng88/blog/blob/master/201704/service_discovery.md

关于本文

作者:@枫叶
原文:https://github.com/chenchunyong/blog/blob/master/microservice/serviceFind.md


今天看啥 - 高品质阅读平台
本文地址:http://www.jintiankansha.me/t/nDWPrpNepw
Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/11630
 
958 次点击