前言
昨日看到很多人的评论,是值得我去思考的。今日早读文章由陆金所@枫叶投稿分享。
前言
基于上一篇【第1266期】基于Docker+Consul+Registrator+Nodejs实现服务治理(一)我们已经实现服务注册,本篇文章我们着重实现服务发现。
服务发现
根据上篇的流程图,服务发现功能包括:
服务订阅
动态获取服务列表
本地缓存
缓存服务路由表
服务调用
服务请求的负载均衡策略
变更通知
监听服务节点变化
更新服务路由表
示例
为了验证服务发现机制,api gateway对外提供getRemoteIp
方法,用来获取service web的本地ip地址,获取service web的ip的作用有两个:
成功返回ip地址,则验证服务发现起作用
多次请求返回不同的ip地址,则验证api gateway 负载均衡起作用
先看示例部署图如下:
registratior监控service web,一旦service web 状态发生变化,通知consul cluster做出相应处理,api gateway 订阅consul cluster 的服务,根据负载均衡的策略,把请求转发到对应web处理。
获取service web本地ip的时序图如下:
下面我们开始实现我们的功能。
源码代码地址与目录
源码地址: gateway-test:https://github.com/chenchunyong/gateway-test
目录如下:
app.js------ app启动入口,
discovery.js------ 服务发现
router.js------ 暴露getRemoteIp方法
serviceLocalStorage.js------ 缓存�服务地址
watch.js------ 监控注册中心的service 是否发生变化
startWatch.js------ 启动监控,如果发生变化,则通知缓存更新service列表
Dockerfile ------ 制作docker image
docker-compose.yml------ 服务编排
具体功能实现
下面会对上面提供的功能点依次进行实现(展示代码中只保留核心代码,详细请见代码)。
服务发现discovery.js
,代码如下:
classDiscovery {
connect(...args) {
if (!this.consul) {
debug(`与consul server连接中...`);
//建立连接,//需要注意的时,由于需要动态获取docker内的consul server的地址,//所以host需要配置为consulserver(来自docker-compose配置的consulserver)//发起请求时会经过docker内置的dns server,即可把consulserver替换为具体的consul 服务器 ipthis.consul=newConsul({
host:'consulserver',
...args,
promisify:utils.fromCallback//转化为promise类型
});
}
returnthis;
}
/** * 根据名称获取服务 * @param{*}opts*/asyncgetService(opts) {
if (!this.consul) {
thrownewError
('请先用connect方法进行连接');
}
const {service} = opts;
// 从缓存中获取列表constservices=serviceLocalStorage.getItem(service);
if (services.length>0) {
debug(`命中缓存,key:${service},value:${JSON.stringify(services)}`);
return services;
}
//如果缓存不存在,则获取远程数据let result =awaitthis
.consul
.catalog
.service
.nodes(opts);
debug(`获取服务端数据,key:${service}:value:${JSON.stringify(result[0])}`);
serviceLocalStorage.setItem(service, result[0])
return result[0];
}
}
调用getService
获取注册服务信息,步骤如下:
检查缓存中是否存在,如果存在,则从缓存中获取
如果不存在,则获取最新的服务列表
把从服务端的服务列表,存储到缓存中
2. 变更通知 && 更新本地服务列表
监听服务节点,一旦发生变化,立即通知对应的订阅者,更新本地服务列表。
监听服务节点watch.js
代码:
classWatch {
/** * 监控需要的服务 * @param{*}services * @param{*}onChanged*/watch(services, onChanged) {
constconsul
=this.consul;
if (services ===undefined) {
thrownewError('service 不能为空')
}
if (typeof services ==='string') {
serviceWatch(services);
} elseif (services instanceofArray) {
services.forEach(service=> {
serviceWatch(service);
});
}
// 监听服务核心代码functionserviceWatch(service) {
constwatch=consul.watch({method:consul.catalog.service.nodes, options: {
service }});
// 监听服务如果发现,则触发回调方法watch.on('change', data=> {
constresult= {
name: service,
data };
debug(`监听${service}内容有变化:${JSON.stringify(result)}`);
onChanged(null, result);
});
watch.on('error', error=> {
debug(`监听${service}错误,错误的内容为:${error}`);
onChanged(error, null);
});
}
returnthis;
}
}
由于nodejs是单线程的,需要额外启动一个子进程来监听服务的变化,一旦服务列表有变化,则把服务列表更新到缓存中,请看app.js
代码:
constApplication=require('koa');
constapp=newApplication();
constdebug=require('debug');
constappDebug=debug('dev:app');
constforkDebug=debug('dev:workerProcess');
constchild_process=require('child_process');
constrouter=require('./router');
constserviceLocalStorage=require('./serviceLocalStorage.js');
//监听3000端口app.listen(3000, '0.0.0.0',() => {
appDebug('Server running at 3000');
});
app .use(router.routes())
.use
(router.allowedMethods);
// fork一个子进程,用于监听服务节点变化constworkerProcess=child_process.fork('./startWatch.js');
// 子进程退出workerProcess.on('exit', function (code) {
forkDebug(`子进程已退出,退出码:${code}`);
});
workerProcess.on('error', function (error) {
forkDebug(`error: ${error}`);
});
// 接收变化的服务列表,并更新到缓存中workerProcess.on('message', msg=> {
if (msg) {
appDebug(`从监控中数据变化:${JSON.stringify(msg)}`);
//更新缓存中服务列表serviceLocalStorage.setItem(msg.name, msg.data);
}
});
发送变更的服务列表给主进程,涉及到的代码startWatch.js
:
constwatch=require('./watch');
// 监听服务节点,如果发现变化,则通知主进程的服务列表进行更新watch.connect().watch(['service-web'],(error,data)=>{
process.send(data);
});
3. 服务调用
结合前面说的服务发现,我们来看服务是如何被调用的,涉及到的代码router.js
:
router.get('/service-web/getRemoteIp', async(ctx, next)
=> {
//获取具体ip信息consthost=awaitgetServiceHost('service-web');
constfetchUrl=`http://${host}/getRemoteIp`;
// 获取到具体服务的ip信息constresult=awaitrequest.get(fetchUrl);
debug(`getRemoteIp:${result.text}`);
ctx.body=result.text;
});
/** * 根据service name 获取 service 对应host*/asyncfunctiongetServiceHost(name) {
//根据服务名称获取注册的服务信息,如果缓存中存在,则从缓存中获取,如果不存在则获取数据constservices=awaitdiscovery.getService({service: name});
random =Math.floor(Math.random() * (services.length));
//定义随机数,随机获取ip的负载均衡策略consthost= services[random];
debug(`service host ${services[random]}`)
return host;
}
部署服务
根据前面提供部署图,我们来实现服务编排,
完整的docker-compose.yml
代码如下:
version: '3.0'
services:
# consul server,对外暴露的ui接口为8500,只有在2台consul服务器的情况下集群才起作用
consulserver:
image: progrium/consul:latest
hostname: consulserver
ports:
- "8300"
- "8400"
- "8500:8500"
- "53"
command: -server -ui-dir /ui -data-dir /tmp/consul --bootstrap-expect=2
networks:
- app
# consul server1在consul server服务起来后,加入集群中
consulserver1:
image: progrium/consul:latest
hostname: consulserver1
depends_on:
- "consulserver"
ports:
- "8300"
- "8400"
- "8500"
- "53"
command: -server -data-dir /tmp/consul -join consulserver
networks:
- app
# consul server2在consul server服务起来后,加入集群中
consulserver2:
image: progrium/consul:latest
hostname: consulserver2
depends_on:
-
"consulserver"
ports:
- "8300"
- "8400"
- "8500"
- "53"
command: -server -data-dir /tmp/consul -join consulserver
networks:
- app
# 监听容器中暴露的端口,一定有新的端口,注册到注册中心
registrator:
image: gliderlabs/registrator:master
hostname: registrator
depends_on:
- "consulserver"
volumes:
- "/var/run/docker.sock:/tmp/docker.sock"
command: -internal consul://consulserver:8500
networks:
- app
serviceweb:
image: windavid/node-service-test-web
depends_on:
- "consulserver"
environment:
SERVICE_3000_NAME: service-web
ports:
- "3000"
networks:
- app
# app gatway 暴露对外访问3000端口
gateway:
image: windavid/gateway-test
hostname: gateway
ports:
- "3000:3000"
networks:
- app
networks:
app:
运行以下命令,启动服务:
docker-compose up -d --scale serviceweb=3
验证服务功能
启动服务后,我们在注册中心发现service-web 对应的服务器ip分别为:
172.21.0.6,172.21.0.7,172.21.0.9。
1. 验证服务发现
如前面时序图所述,api gateway对外暴露/service-web/getRemoteIp
来获取本地ip信息。
多次运行以下代码来验证结果:
curl http://127.0.0.1:3000/service-web/getRemoteIp
多次运行的结果为以下随机一个:
{"ip":"172.21.0.6"},{"ip":"172.21.0.7"},{"ip":"172.21.0.9"}
说明服务发现以及负载均衡的功能验证通过。
2. 验证服务通知功能
下线某个service-web服务后,查看gateway-test日志:
dev:watch 监听service-web内容有变化:{"name":
"service-web","data":[{"Node":"consulserver","Address":"172.21.0.2","ServiceID":"registrator:gatewaytest_serviceweb_1:3000","ServiceName":"service-web","ServiceTags":null,"ServiceAddress":"172.21.0.7","ServicePort":3000},{"Node":"consulserver","Address":"172.21.0.2","ServiceID":"registrator:gatewaytest_serviceweb_3:3000","ServiceName":"service-web","ServiceTags":null,"ServiceAddress":"172.21.0.6","ServicePort":3000}]}
发现ip为172.21.0.9的服务已经下线,且通知到订阅者。
总结 && 参考
到此为止,我们已实现了服务的注册与发现。下篇我们介绍分布式服务中的服务跟踪。
参考:
https://github.com/jasonGeng88/blog/blob/master/201704/service_discovery.md
关于本文
作者:@枫叶
原文:https://github.com/chenchunyong/blog/blob/master/microservice/serviceFind.md