(二): 基于ZeroMQ的实时通讯平台
基于ZeroMQ的實(shí)時(shí)通訊平臺(tái)
上篇:C++分布式實(shí)時(shí)應(yīng)用框架 (Cpp Distributed Real-time Application Framework)----(一):整體介紹
版權(quán)聲明:本文版權(quán)及所用技術(shù)歸屬smartguys團(tuán)隊(duì)所有,對(duì)于抄襲,非經(jīng)同意轉(zhuǎn)載等行為保留法律追究的權(quán)利!
通訊平臺(tái)作為C++分布式實(shí)時(shí)應(yīng)用框架(Cpp Distributed Real-time ApplicationFramework)的最核心模塊,承擔(dān)了分布式實(shí)時(shí)框架的基礎(chǔ)通訊功能。通訊平臺(tái)框架具備了基于Reactor模式的網(wǎng)絡(luò)通訊能力,并且依賴于ZeroMQ庫,因此支持非持久化的message queue的功能?;谂渲梦募碜詣?dòng)建立鏈接關(guān)系的功能,可以和狀態(tài)中心一起配合,實(shí)現(xiàn)無需重啟節(jié)點(diǎn)的動(dòng)態(tài)擴(kuò)容縮容等功能。強(qiáng)大的實(shí)時(shí)監(jiān)控能力,可以實(shí)時(shí)上報(bào)每個(gè)通訊子節(jié)點(diǎn)的TPS和時(shí)延等關(guān)鍵性能數(shù)據(jù)。管控業(yè)務(wù)進(jìn)程的能力,業(yè)務(wù)進(jìn)程的心跳檢測(cè),故障時(shí)自動(dòng)重啟、保證系統(tǒng)正常運(yùn)行。完善的平臺(tái)工具,可以通過通訊平臺(tái)向業(yè)務(wù)進(jìn)程發(fā)送各種命令,如:調(diào)整日志級(jí)別,刷新業(yè)務(wù)參數(shù),啟停業(yè)務(wù)進(jìn)程等等。下面將逐一介紹通訊平臺(tái)的功能細(xì)節(jié)。
一、根據(jù)配置文件自動(dòng)建立通訊鏈接拓?fù)潢P(guān)系
常見的分布式系統(tǒng)通常將進(jìn)程間、節(jié)點(diǎn)間的各種通訊關(guān)系寫死在業(yè)務(wù)代碼中,這是導(dǎo)致代碼復(fù)雜難以理解的原因。我們創(chuàng)新地將所有的通訊關(guān)系提取到AppInit.json配置文件中,業(yè)務(wù)代碼中不再包含任何與通訊連接相關(guān)的內(nèi)容,使業(yè)務(wù)代碼可以更專注于業(yè)務(wù)處理,而不用分心于復(fù)雜的分布式節(jié)點(diǎn)通訊當(dāng)中。下面我們將帶大家看下圖所示通訊關(guān)系的配置。
OLC作為數(shù)據(jù)分發(fā)節(jié)點(diǎn),給多個(gè)業(yè)務(wù)處理節(jié)點(diǎn)分發(fā)消息。業(yè)務(wù)處理節(jié)點(diǎn)內(nèi)部由OCDis接收外部消息,轉(zhuǎn)發(fā)給內(nèi)部的OCPro業(yè)務(wù)處理進(jìn)程,并負(fù)責(zé)處理完后的回包。
OLC配置部分:
"OLC" : {
"AUTO_START" : "YES",
"ENDPOINTS" : [
{ // 用于與SmartMonitor建立心跳
"name" : "MonitorSUB",
"zmq_socket_action" : "CONNECT", // ZMQ的連接模式
"zmq_socket_type" : "ZMQ_SUB" // ZMQ的通訊模式
},
{ // 下發(fā)消息給OCDis,這邊存在轉(zhuǎn)發(fā)功能,支持業(yè)務(wù)實(shí)現(xiàn)按條件轉(zhuǎn)發(fā)
"downstream" : [ "OCDis2OLC"],
"name" : "NE2OLC", // 根據(jù)這個(gè)名字在業(yè)務(wù)代碼中實(shí)現(xiàn)轉(zhuǎn)發(fā)
"zmq_socket_action" : "BIND",
"zmq_socket_type" : "ZMQ_STREAM"
},
{ // OLC到OCDis的鏈路
"name" : "OCDis2OLC",
"statistics_on" : true,
"zmq_socket_action" : "CONNECT",
"zmq_socket_type" : "ZMQ_DEALER"
},
{ // OCDis回OLC的鏈路,之所以來去分開,主要用于實(shí)現(xiàn)優(yōu)雅啟停功能(啟停節(jié)點(diǎn)保證不丟消息)
"name" : "OCDis2OLC_Backway",
"statistics_on" : true,
"zmq_socket_action" : "CONNECT",
"zmq_socket_type" : "ZMQ_DEALER",
"backway_pair" : "OCDis2OLC"
},
{ // 用于與SmartMonitor的命令消息鏈路
"name" : "OLC2Monitor",
"zmq_socket_action" : "CONNECT",
"zmq_socket_type" : "ZMQ_DEALER"
},
],
"ENDPOINT_TO_MONITOR" : "OLC2Monitor",
"INSTANCE_GROUP" : [
{
"instance_endpoints_address" : [
{
"endpoint_name" : "NE2OLC",
"zmq_socket_address" : "tcp://*:6701"
},
{
"endpoint_name" : "OCDis2OLC",
"zmq_socket_address" : [
"tcp://127.0.0.1:7201" // 跨機(jī)的IP地址與端口,配合狀態(tài)中心可實(shí)現(xiàn)自動(dòng)管理,無需人工參與配置
]
},
{
"endpoint_name" : "OCDis2OLC_Backway",
"zmq_socket_address" : [
"tcp://127.0.0.1:7202"
]
},
{
"endpoint_name" : "OLC2Monitor",
"zmq_socket_address" : "ipc://Monitor2Business_IPC"
},
{
"endpoint_name" : "MonitorSUB",
"zmq_socket_address" : "ipc://MonitorPUB"
}
],
"instance_group_name" : "1"
}
]
},
OLC程序:
static const char * ENDPOINT_NE2OLC = "NE2OLC";
static const char * ENDPOINT_OLC2OCDIS = "OCDis2OLC";
static const char * ENDPOINT_MONITORSUB = "MonitorSUB";
int main(int argc, char * argv[]) {
SmartUtilities::Daemonize();
OLCProxyServer server(argc, argv);
if (!server.Initialize(logger))
return -1;
// OLC與OCDis的消息處理
server.SetCallbackOnReceivingMessage(ENDPOINT_OLC2OCDIS, bind(&OLCProxyServer::ReceiveFromOCDis, &server, _1, _2, _3));
// OLC與SmartMonitor的消息處理
server.SetCallbackOnReceivingMessage(ENDPOINT_MONITORSUB, bind(&OLCProxyServer::ReceiveFromMonitorSUB, &server, _1, _2, _3));
// 解析消息包實(shí)現(xiàn)業(yè)務(wù)功能
server.SetPacketParserFunction(ENDPOINT_NE2OLC, bind(&OLCProxyServer::ParseStreamCCR, &server, _1, _2, _3));
// 設(shè)置消息轉(zhuǎn)發(fā)具體規(guī)則
server.SetDownstreamSelector(ENDPOINT_NE2OLC, bind(&OLCProxyServer::StreamSelector, &server, _1, _2));
server.Run();
return 0;
}
二、在線更新鏈接拓?fù)淠芰?/h3>
通訊平臺(tái)支持在線重新讀取更新的配置文件,更新網(wǎng)絡(luò)拓?fù)?,自?dòng)建立新鏈接、斷開舊鏈接的能力。配合狀態(tài)中心可以實(shí)現(xiàn)無需重啟節(jié)點(diǎn)的動(dòng)態(tài)擴(kuò)容縮容等功能。
三、SmartMonitor進(jìn)程監(jiān)控管理業(yè)務(wù)進(jìn)程與SmartTool工具進(jìn)程
業(yè)務(wù)進(jìn)程可以跟SmartMonitor建立通訊聯(lián)系,SmartMonitor可以檢測(cè)業(yè)務(wù)進(jìn)程的心跳,以保證業(yè)務(wù)進(jìn)程的可用。SmartMonitor通過AppCount.json來管理節(jié)點(diǎn)業(yè)務(wù)進(jìn)程,實(shí)現(xiàn)統(tǒng)一啟停等功能。
{
"OCPro": {
"IN": 2, // 業(yè)務(wù)進(jìn)程可以有不同的種類,后面代表進(jìn)程數(shù)
"PS": 3,
"SMS": 4,
},
"OCDis": 3,
"SERVER_TYPE":"OCS" // 節(jié)點(diǎn)的類型
}
還可以通過SmartTool工具進(jìn)程,來給業(yè)務(wù)進(jìn)程發(fā)送各種命令,如:調(diào)整日志級(jí)別,刷新業(yè)務(wù)參數(shù),啟停業(yè)務(wù)進(jìn)程等等。
1. 啟動(dòng)平臺(tái)
SmartMonitor
2. 停平臺(tái)
SmartTool stop all
停指定進(jìn)程(停止后會(huì)被SmartMonitor重新拉起)
SmartTool stop OCPro 停止所有業(yè)務(wù)的OCPro進(jìn)程
SmartTool stop OCPro.IN 停止IN業(yè)務(wù)的OCPro進(jìn)程
SmartTool stop 4829 停止PID為4829的進(jìn)程
3. 調(diào)整應(yīng)用層、框架層日志級(jí)別
其中,日志級(jí)別為error,warn,info,debug,trace
SmartTool log 進(jìn)程名 level=日志級(jí)別,flush=日志級(jí)別
比如: SmartTool log OCProlevel=debug,flush=debug
四、通訊平臺(tái)性能數(shù)據(jù)
進(jìn)程Z負(fù)載控制消息流量,進(jìn)程A負(fù)責(zé)發(fā)、收消息,統(tǒng)計(jì)時(shí)延數(shù)據(jù)。進(jìn)程B收到消息后負(fù)責(zé)回消息。
性能瓶頸主要在A機(jī),既要負(fù)責(zé)收發(fā)包,又要統(tǒng)計(jì)時(shí)延數(shù)據(jù),還要控制流量。
未完待續(xù)...
技術(shù)交流合作QQ群:436466587 歡迎討論交流
總結(jié)
以上是生活随笔為你收集整理的(二): 基于ZeroMQ的实时通讯平台的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Vue.js响应式原理
- 下一篇: ini是什么