日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ZMQ研究

發(fā)布時(shí)間:2025/7/25 编程问答 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ZMQ研究 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

FROM:?http://www.open-open.com/lib/view/open1344869477975.html

ZeroMQ,史上最快的消息隊(duì)列

—– ZMQ的學(xué)習(xí)和研究

一、ZeroMQ的背景介紹

引用官方的說法: “ZMQ(以下ZeroMQ簡稱ZMQ)是一個(gè)簡單好用的傳輸層,像框架一樣的一個(gè)socket library,他使得Socket編程更加簡單、簡潔和性能更高。是一個(gè)消息處理隊(duì)列庫,可在多個(gè)線程、內(nèi)核和主機(jī)盒之間彈性伸縮。ZMQ的明確目標(biāo)是“成為標(biāo)準(zhǔn)網(wǎng)絡(luò)協(xié)議棧的一部分,之后進(jìn)入Linux內(nèi)核”。現(xiàn)在還未看到它們的成功。但是,它無疑是極具前景的、并且是人們更加需要的“傳統(tǒng)”BSD套接字之上的一 層封裝。ZMQ讓編寫高性能網(wǎng)絡(luò)應(yīng)用程序極為簡單和有趣。”

近幾年有關(guān)”Message Queue”的項(xiàng)目層出不窮,知名的就有十幾種,這主要是因?yàn)楹竽柖蓵r(shí)代,分布式處理逐漸成為主流,業(yè)界需要一套標(biāo)準(zhǔn)來解決分布式計(jì)算環(huán)境中節(jié)點(diǎn)之間的消息通信。幾年的競爭下來,Apache基金會(huì)旗下的符合AMQP/1.0標(biāo)準(zhǔn)的RabbitMQ已經(jīng)得到了廣泛的認(rèn)可,成為領(lǐng)先的MQ項(xiàng)目。

與RabbitMQ相比,ZMQ并不像是一個(gè)傳統(tǒng)意義上的消息隊(duì)列服務(wù)器,事實(shí)上,它也根本不是一個(gè)服務(wù)器,它更像是一個(gè)底層的網(wǎng)絡(luò)通訊庫,在Socket API之上做了一層封裝,將網(wǎng)絡(luò)通訊、進(jìn)程通訊和線程通訊抽象為統(tǒng)一的API接口。

二、ZMQ是什么?

閱讀了ZMQ的Guide文檔后,我的理解是,這是個(gè)類似于Socket的一系列接口,他跟Socket的區(qū)別是:普通的socket是端到端的(1:1的關(guān)系),而ZMQ卻是可以N:M 的關(guān)系,人們對(duì)BSD套接字的了解較多的是點(diǎn)對(duì)點(diǎn)的連接,點(diǎn)對(duì)點(diǎn)連接需要顯式地建立連接、銷毀連接、選擇協(xié)議(TCP/UDP)和處理錯(cuò)誤等,而ZMQ屏蔽了這些細(xì)節(jié),讓你的網(wǎng)絡(luò)編程更為簡單。ZMQ用于node與node間的通信,node可以是主機(jī)或者是進(jìn)程。

三、本文的目的

在集群對(duì)外提供服務(wù)的過程中,我們有很多的配置,需要根據(jù)需要隨時(shí)更新,那么這個(gè)信息如果推動(dòng)到各個(gè)節(jié)點(diǎn)?并且保證信息的一致性和可靠性?本文在介紹ZMQ基本理論的基礎(chǔ)上,試圖使用ZMQ實(shí)現(xiàn)一個(gè)配置分發(fā)中心。從一個(gè)節(jié)點(diǎn),將信息無誤的分發(fā)到各個(gè)服務(wù)器節(jié)點(diǎn)上,并保證信息正確性和一致性。

四、ZMQ的三個(gè)基本模型

ZMQ提供了三個(gè)基本的通信模型,分別是“Request-Reply “,”Publisher-Subscriber“,”Parallel Pipeline”,我們從這三種模式一窺ZMQ的究竟

ZMQ的hello world!

由Client發(fā)起請(qǐng)求,并等待Server回應(yīng)請(qǐng)求。請(qǐng)求端發(fā)送一個(gè)簡單的hello,服務(wù)端則回應(yīng)一個(gè)world。請(qǐng)求端和服務(wù)端都可以是 1:N 的模型。通常把 1 認(rèn)為是 Server ,N 認(rèn)為是Client 。ZMQ 可以很好的支持路由功能(實(shí)現(xiàn)路由功能的組件叫作Device),把 1:N 擴(kuò)展為N:M (只需要加入若干路由節(jié)點(diǎn))。如圖1所示:

圖1:ZMQ的Request-Reply 通信

服務(wù)端的php程序如下:

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 <?php /* * Hello World server * Binds REP socket to tcp://*:5555 * Expects "Hello" from client, replies with "World" * @author Ian Barber &lt;ian(dot)barber(at)gmail(dot)com&gt; */ $context = new ZMQContext(1); // Socket to talk to clients $responder = new ZMQSocket($context, ZMQ::SOCKET_REP); $responder-&gt;bind("tcp://*:5555"); while(true) { // Wait for next request from client $request = $responder-&gt;recv(); printf ("Received request: [%s]\n", $request); // Do some 'work' sleep (1); // Send reply back to client $responder-&gt;send("World"); }

Client程序如下:

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 <?php /* ?*? Hello World client ?*? Connects REQ socket to tcp://localhost:5555 ?*? Sends "Hello" to server, expects "World" back ?* @author Ian Barber &lt;ian(dot)barber(at)gmail(dot)com&gt; ?*/ $context = new ZMQContext(); //? Socket to talk to server echo "Connecting to hello world server...\n"; $requester = new ZMQSocket($context, ZMQ::SOCKET_REQ); $requester-&gt;connect("tcp://localhost:5555"); for($request_nbr = 0; $request_nbr != 10; $request_nbr++) { ????printf ("Sending request %d...\n", $request_nbr); ????$requester-&gt;send("Hello"); ????$reply = $requester-&gt;recv(); ????printf ("Received reply %d: [%s]\n", $request_nbr, $reply); }

從以上的過程,我們可以了解到使用ZMQ寫基本的程序的方法,需要注意的是:

a) 服務(wù)端和客戶端無論誰先啟動(dòng),效果是相同的,這點(diǎn)不同于Socket。

b) 在服務(wù)端收到信息以前,程序是阻塞的,會(huì)一直等待客戶端連接上來。

c) 服務(wù)端收到信息以后,會(huì)send一個(gè)“World”給客戶端。值得注意的是一定是client連接上來以后,send消息給Server,然后Server再rev然后響應(yīng)client,這種一問一答式的。如果Server先send,client先rev是會(huì)報(bào)錯(cuò)的。

d) ZMQ通信通信單元是消息,他除了知道Bytes的大小,他并不關(guān)心的消息格式。因此,你可以使用任何你覺得好用的數(shù)據(jù)格式。Xml、Protocol Buffers、Thrift、json等等。

e) 雖然可以使用ZMQ實(shí)現(xiàn)HTTP協(xié)議,但是,這絕不是他所擅長的。

ZMQ的Publish-subscribe模式

我們可以想象一下天氣預(yù)報(bào)的訂閱模式,由一個(gè)節(jié)點(diǎn)提供信息源,由其他的節(jié)點(diǎn),接受信息源的信息,如圖2所示:

圖2:ZMQ的Publish-subscribe

示例代碼如下 :

Publisher:

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 <?php /* * Weather update server * Binds PUB socket to tcp://*:5556 * Publishes random weather updates * @author Ian Barber &lt;ian(dot)barber(at)gmail(dot)com&gt; */ // Prepare our context and publisher $context = new ZMQContext(); $publisher = $context-&gt;getSocket(ZMQ::SOCKET_PUB); $publisher-&gt;bind("tcp://*:5556"); while (true) { // Get values that will fool the boss $zipcode = mt_rand(0, 100000); $temperature = mt_rand(-80, 135); $relhumidity = mt_rand(10, 60); // Send message to all subscribers $update = sprintf ("%05d %d %d", $zipcode, $temperature, $relhumidity); $publisher-&gt;send($update); }</pre> Subscriber <pre>&lt;?php /* * Weather update client * Connects SUB socket to tcp://localhost:5556 * Collects weather updates and finds avg temp in zipcode * @author Ian Barber &lt;ian(dot)barber(at)gmail(dot)com&gt; */ $context = new ZMQContext(); // Socket to talk to server echo "Collecting updates from weather server…", PHP_EOL; $subscriber = new ZMQSocket($context, ZMQ::SOCKET_SUB); $subscriber-&gt;connect("tcp://localhost:5556"); // Subscribe to zipcode, default is NYC, 10001 $filter = $_SERVER['argc'] &gt; 1 ? $_SERVER['argv'][1] : "10001"; $subscriber-&gt;setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter); // Process 100 updates $total_temp = 0; for ($update_nbr = 0; $update_nbr &lt; 100; $update_nbr++) { $string = $subscriber-&gt;recv(); sscanf ($string, "%d %d %d", $zipcode, $temperature, $relhumidity); $total_temp += $temperature; } printf ("Average temperature for zipcode '%s' was %dF\n", $filter, (int) ($total_temp / $update_nbr));

這段代碼講的是,服務(wù)器端生成隨機(jī)數(shù)zipcode、temperature、relhumidity分別代表城市代碼、溫度值和濕度值。然后不斷的廣播信息,而客戶端通過設(shè)置過濾參數(shù),接受特定城市代碼的信息,收集完了以后,做一個(gè)平均值。

a) 與Hello World不同的是,Socket的類型變成SOCKET_PUB和SOCKET_SUB類型。

b) 客戶端需要$subscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter);設(shè)置一個(gè)過濾值,相當(dāng)于設(shè)定一個(gè)訂閱頻道,否則什么信息也收不到。

c) 服務(wù)器端一直不斷的廣播中,如果中途有Subscriber端退出,并不影響他繼續(xù)的廣播,當(dāng)Subscriber再連接上來的時(shí)候,收到的就是后來發(fā)送的新的信息了。這對(duì)比較晚加入的,或者是中途離開的訂閱者,必然會(huì)丟失掉一部分信息,這是這個(gè)模式的一個(gè)問題,所謂的Slow joiner。稍后,會(huì)解決這個(gè)問題。

d) 但是,如果Publisher中途離開,所有的Subscriber會(huì)hold住,等待Publisher再上線的時(shí)候,會(huì)繼續(xù)接受信息。

ZMQ的PipeLine模型

想象一下這樣的場景,如果需要統(tǒng)計(jì)各個(gè)機(jī)器的日志,我們需要將統(tǒng)計(jì)任務(wù)分發(fā)到各個(gè)節(jié)點(diǎn)機(jī)器上,最后收集統(tǒng)計(jì)結(jié)果,做一個(gè)匯總。PipeLine比較適合于這種場景,他的結(jié)構(gòu)圖,如圖3所示。

圖3:ZMQ的PipeLine模型

Parallel task ventilator in PHP

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 <?php /* * Task ventilator * Binds PUSH socket to tcp://localhost:5557 * Sends batch of tasks to workers via that socket * @author Ian Barber &lt;ian(dot)barber(at)gmail(dot)com&gt; */ $context = new ZMQContext(); // Socket to send messages on $sender = new ZMQSocket($context, ZMQ::SOCKET_PUSH); $sender-&gt;bind("tcp://*:5557"); echo "Press Enter when the workers are ready: "; $fp = fopen('php://stdin', 'r'); $line = fgets($fp, 512); fclose($fp); echo "Sending tasks to workers…", PHP_EOL; // The first message is "0" and signals start of batch $sender-&gt;send(0); // Send 100 tasks $total_msec = 0; // Total expected cost in msecs for ($task_nbr = 0; $task_nbr &lt; 100; $task_nbr++) { // Random workload from 1 to 100msecs $workload = mt_rand(1, 100); $total_msec += $workload; $sender-&gt;send($workload); } printf ("Total expected cost: %d msec\n", $total_msec); sleep (1); // Give 0MQ time to deliver

Parallel task worker in PHP

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 <?php /* * Task worker * Connects PULL socket to tcp://localhost:5557 * Collects workloads from ventilator via that socket * Connects PUSH socket to tcp://localhost:5558 * Sends results to sink via that socket * @author Ian Barber &lt;ian(dot)barber(at)gmail(dot)com&gt; */ $context = new ZMQContext(); // Socket to receive messages on $receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL); $receiver-&gt;connect("tcp://localhost:5557"); // Socket to send messages to $sender = new ZMQSocket($context, ZMQ::SOCKET_PUSH); $sender-&gt;connect("tcp://localhost:5558"); // Process tasks forever while (true) { $string = $receiver-&gt;recv(); // Simple progress indicator for the viewer echo $string, PHP_EOL; // Do the work usleep($string * 1000); // Send results to sink $sender-&gt;send(""); }

Parallel task sink in PHP

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 <?php /* * Task sink * Binds PULL socket to tcp://localhost:5558 * Collects results from workers via that socket * @author Ian Barber &lt;ian(dot)barber(at)gmail(dot)com&gt; */ // Prepare our context and socket $context = new ZMQContext(); $receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL); $receiver-&gt;bind("tcp://*:5558"); // Wait for start of batch $string = $receiver-&gt;recv(); // Start our clock now $tstart = microtime(true); // Process 100 confirmations $total_msec = 0; // Total calculated cost in msecs for ($task_nbr = 0; $task_nbr &lt; 100; $task_nbr++) { $string = $receiver-&gt;recv(); if($task_nbr % 10 == 0) { echo ":"; } else { echo "."; } } $tend = microtime(true); $total_msec = ($tend - $tstart) * 1000; echo PHP_EOL; printf ("Total elapsed time: %d msec", $total_msec); echo PHP_EOL;

從程序中,我們可以看到,task ventilator使用的是SOCKET_PUSH,將任務(wù)分發(fā)到Worker節(jié)點(diǎn)上。而Worker節(jié)點(diǎn)上,使用SOCKET_PULL從上游接受任務(wù),并使用SOCKET_PUSH將結(jié)果匯集到Slink。值得注意的是,任務(wù)的分發(fā)的時(shí)候也同樣有一個(gè)負(fù)載均衡的路由功能,worker可以隨時(shí)自由加入,task ventilator可以均衡將任務(wù)分發(fā)出去。

五、其他擴(kuò)展模式

通常,一個(gè)節(jié)點(diǎn),即可以作為Server,同時(shí)也能作為Client,通過PipeLine模型中的Worker,他向上連接著任務(wù)分發(fā),向下連接著結(jié)果搜集的Sink機(jī)器。因此,我們可以借助這種特性,豐富的擴(kuò)展原有的三種模式。例如,一個(gè)代理Publisher,作為一個(gè)內(nèi)網(wǎng)的Subscriber接受信息,同時(shí)將信息,轉(zhuǎn)發(fā)到外網(wǎng),其結(jié)構(gòu)圖如圖4所示。


圖4:ZMQ的擴(kuò)展模式

六、多個(gè)服務(wù)器

ZMQ和Socket的區(qū)別在于,前者支持N:M的連接,而后者則只是1:1的連接,那么一個(gè)Client連接多個(gè)Server的情況是怎樣的呢,我們通過圖5來說明。

圖5:ZMQ的N:1的連接情況

我們假設(shè)Client有R1,R2,R3,R4四個(gè)任務(wù),我們只需要一個(gè)ZMQ的Socket,就可以連接四個(gè)服務(wù),他能夠自動(dòng)均衡的分配任務(wù)。如圖5所示,R1,R4自動(dòng)分配到了節(jié)點(diǎn)A,R2到了B,R3到了C。如果我們是N:M的情況呢?這個(gè)擴(kuò)展起來,也不難,如圖6所示。

圖6:N:M的連接

我們通過一個(gè)中間結(jié)點(diǎn)(Broker)來進(jìn)行負(fù)載均衡的功能。我們通過代碼了解,其中的Client和我們的Hello World的Client端是一樣的,而Server端的不同是,他不需要監(jiān)聽端口,而是需要連接Broker的端口,接受需要處理的信息。所以,我們重點(diǎn)閱讀Broker的代碼:

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 <?php /* * Simple request-reply broker * @author Ian Barber &lt;ian(dot)barber(at)gmail(dot)com&gt; */ // Prepare our context and sockets $context = new ZMQContext(); $frontend = new ZMQSocket($context, ZMQ::SOCKET_ROUTER); $backend = new ZMQSocket($context, ZMQ::SOCKET_DEALER); $frontend-&gt;bind("tcp://*:5559"); $backend-&gt;bind("tcp://*:5560"); // Initialize poll set $poll = new ZMQPoll(); $poll-&gt;add($frontend, ZMQ::POLL_IN); $poll-&gt;add($backend, ZMQ::POLL_IN); $readable = $writeable = array(); // Switch messages between sockets while(true) { $events = $poll-&gt;poll($readable, $writeable); foreach($readable as $socket) { if($socket === $frontend) { // Process all parts of the message while(true) { $message = $socket-&gt;recv(); // Multipart detection $more = $socket-&gt;getSockOpt(ZMQ::SOCKOPT_RCVMORE); $backend-&gt;send($message, $more ? ZMQ::MODE_SNDMORE : null); if(!$more) { break; // Last message part } } } else if($socket === $backend) { $message = $socket-&gt;recv(); // Multipart detection $more = $socket-&gt;getSockOpt(ZMQ::SOCKOPT_RCVMORE); $frontend-&gt;send($message, $more ? ZMQ::MODE_SNDMORE : null); if(!$more) { break; // Last message part } } } }

Broker監(jiān)聽了兩個(gè)端口,接受從多個(gè)Client端發(fā)送過來的數(shù)據(jù),并將數(shù)據(jù),轉(zhuǎn)發(fā)給Server。在Broker中,我們監(jiān)聽了兩個(gè)端口,使用了兩個(gè)Socket,那么對(duì)于多個(gè)Socket的情況,我們是不需要通過輪詢的方式去處理數(shù)據(jù)的,在之前,我們可以使用libevent實(shí)現(xiàn),異步的信息處理和傳輸。而現(xiàn)在,我們只需要使用ZMQ的$poll->poll以實(shí)現(xiàn)多個(gè)Socket的異步處理。

七、進(jìn)程間的通信

ZMQ不僅能通過TCP完成節(jié)點(diǎn)間的通信,也可以通過Socket文件完成進(jìn)程間的通信。如圖7所示,我們fork三個(gè)PHP進(jìn)程,將進(jìn)程1的數(shù)據(jù),通過Socket文件發(fā)送到進(jìn)程3。

圖7:進(jìn)程間的通信

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 <?php function step1() { ????????$context = new ZMQContext(); ????????// Signal downstream to step 2 ????????$sender = new ZMQSocket($context, ZMQ::SOCKET_PAIR); ????????$sender-&gt;connect("ipc://step2.ipc"); ????????$sender-&gt;send("hello ,i am step1"); } function step2() { ????????$pid = pcntl_fork(); ????????if($pid == 0) { ????????????????step1(); ????????????????exit(); ????????} ????????$context = new ZMQContext(); ????????//? Bind to ipc: endpoint, then start upstream thread ????????$receiver = new ZMQSocket($context, ZMQ::SOCKET_PAIR); ????????$receiver-&gt;bind("ipc://step2.ipc"); ????????// Wait for signal?? ????????sleep(10); ????????$strings = $receiver-&gt;recv(); ????????echo "step2 receiver is $strings". PHP_EOL; ????????sleep(10); ????????// Signal downstream to step 3 ????????$sender = new ZMQSocket($context, ZMQ::SOCKET_PAIR); ????????$sender-&gt;connect("ipc://step3.ipc"); ????????$sender-&gt;send($strings); } // Start upstream thread then bind to icp: endpoint $pid = pcntl_fork(); if($pid == 0) { ????????step2(); ????????exit(); } $context = new ZMQContext(); $receiver = new ZMQSocket($context, ZMQ::SOCKET_PAIR); $receiver-&gt;bind("ipc://step3.ipc"); // Wait for signal $sr = $receiver-&gt;recv(); echo "the result is {$sr}".PHP_EOL;

在運(yùn)行中,我們可以看到多了兩個(gè)文件,如圖8所示。

圖8:運(yùn)行過程中生成的文件

八、利用ZeroMQ實(shí)現(xiàn)一個(gè)配置推送中心

當(dāng)我們將WEB代碼部署到集群上的時(shí)候,如果需要實(shí)時(shí)的將最新的配置信息,主動(dòng)的推送到各個(gè)機(jī)器節(jié)點(diǎn)。在此過程中,我們一定要保證,各個(gè)節(jié)點(diǎn)收到的信息的一致性和正確性,如果使用HTTP,由于他的無狀態(tài)性,我們無法保證信息的一致性,當(dāng)然,你可以使用HTTP來實(shí)現(xiàn),只是更復(fù)雜,為什么不用ZMQ?他能讓你更簡單的實(shí)現(xiàn)這些功能。

我們使用ZMQ的信息訂閱模式。在那個(gè)模式中,我們注意到,對(duì)于后來的加入節(jié)點(diǎn),始終會(huì)丟失在他加入之前,已經(jīng)發(fā)送的信息(Slow joiner)。我們可以開啟另外一個(gè)ZMQ的通信通道,用于報(bào)告當(dāng)前節(jié)點(diǎn)的情況(節(jié)點(diǎn)的身份、準(zhǔn)備狀態(tài)等),其結(jié)構(gòu)如圖9所示。

圖9:擴(kuò)展ZMQ的訂閱者模式

我們通過$context->getSocket(ZMQ::SOCKET_REQ);設(shè)置一個(gè)新的Request-Reply連接,來用于Subscriber向Publisher報(bào)告自己的身份信息,而Publisher則等待所有的Subscriber都連接上的時(shí)候,再選擇Publish自己的信息。

Subscriber端的程序如下:

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 <?php $hostname = $_SERVER['argc'] &gt; 1 ? $_SERVER['argv'][1] : "s1"; $context = new ZMQContext(2); $sub = new ZMQSocket($context,ZMQ::SOCKET_SUB); $sub-&gt;connect("tcp://localhost:5561"); //$subscriber-&gt;setSockOpt(ZMQ::SOCKOPT_IDENTITY, $hostname); $sub-&gt;setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE,""); $client = $context-&gt;getSocket(ZMQ::SOCKET_REQ); $client-&gt;connect("tcp://localhost:5562"); while(1) { //$client-&gt;connect("tcp://localhost:5562"); $client-&gt;send($hostname); $version = $client-&gt;recv(); echo $version."\r\n"; if (!empty($version)) { $recive = $sub-&gt;recv(); $vars = json_decode($recive); var_dump($vars); } }

Publisher端的程序如下:

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 <?php $CONFIG["TAOKE_BTS"]["ENABLE"] = true; $CONFIG["QP_BTS"]["ENABLE"] = true; $CONFIG["QP_BTS"]["TK_TEST"] = 13; $string = json_encode($CONFIG); $clients = array("s2","s1","s3"); $context = new ZMQContext(10); //Socket talk to clients $publisher = new ZMQSocket($context,ZMQ::SOCKET_PUB); $publisher-&gt;bind("tcp://*:5561"); //Socket to publish message $server = new ZMQSocket($context,ZMQ::SOCKET_REP); $server-&gt;bind("tcp://*:5562"); while(count($clients)!=0) { ??????$client_name = $server-&gt;recv(); ????????echo "{$client_name} is connect!\r\n"; if (in_array($client_name, $clients)) { //coming one client ????????$key = array_search($client_name, $clients); ????????unset($clients[$key]); ????????echo "$client_name has come in!\r\n"; ????????$server-&gt;send("Version is 2.0"); } else { ????????$server-&gt;send("You are a stranger!"); } } $publisher-&gt;send($string); ?&gt;

每個(gè)節(jié)點(diǎn)通過5562端口,使用Rep模式和Publisher連接,通過這個(gè)連接告之Publisher自己的機(jī)器名,而Publisher端通過白名單的方式,維護(hù)一個(gè)機(jī)器列表,當(dāng)機(jī)器列表中所有的機(jī)器連接上來以后,通過5561端口,將最新的配置信息發(fā)送出去。

后續(xù)的處理,Subscriber可以選擇將配置信息寫入到APC緩存,程序?qū)⑹冀K從緩存中讀取部分配置信息,Subscriber并將更新后的狀態(tài)信息,實(shí)時(shí)的通過5562報(bào)告給Publisher。

雖然,在本示例中不會(huì)出現(xiàn),但是,如果需要發(fā)布的信息量過大,在接受信息的過程中,Subscriber端突然中斷網(wǎng)絡(luò)(或者是程序崩潰),那么當(dāng)他在連接上來的時(shí)候,有部分信息就會(huì)丟失?ZMQ考慮到這個(gè)問題,通過$subscriber->setSockOpt(ZMQ::SOCKOPT_IDENTITY, $hostname);設(shè)置一個(gè)id,當(dāng)這個(gè)id的Subscriber重新連接上來的時(shí)候,他可以從上次中斷的地方,繼續(xù)接受信息,當(dāng)然,節(jié)點(diǎn)的中斷,不會(huì)影響其他的節(jié)點(diǎn)繼續(xù)的接受信息。

那么ZMQ是怎么實(shí)現(xiàn)斷線重連后,繼續(xù)發(fā)送信息呢 ?他會(huì)將斷開的Subscriber應(yīng)該接受到的信息發(fā)到內(nèi)存中,等待他重新上線后,將緩存的信息,繼續(xù)發(fā)送給他。當(dāng)然,內(nèi)存必然是有限的,過多就會(huì)出現(xiàn)內(nèi)存溢出。ZMQ通過

SetSockOpt(ZMQ::SOCKOPT_SWAP, 250000)設(shè)置Swap空間的大小,來防止out of memory and crash。最終,我們的程序運(yùn)行結(jié)果,如圖10所示。


圖10:配置中心的運(yùn)行結(jié)果

當(dāng)然,這只是一個(gè)大體的思路,如果應(yīng)用到實(shí)際的成產(chǎn)環(huán)境中,還需要考慮更多的問題,包含穩(wěn)定性,容錯(cuò)等等。然而,ZMQ由于高并發(fā),以及穩(wěn)定性和易用性,前景不錯(cuò),他的目標(biāo)是進(jìn)入Linux內(nèi)核,我們期待那一天的到來。

參考資料 :

http://www.infoq.com/cn/news/2010/09/introduction-zero-mq Infoq對(duì)zeromq的簡介

http://zguide.zeromq.org/page:all ZeroMQ的guide文檔

來自: www.searchtb.com


總結(jié)

以上是生活随笔為你收集整理的ZMQ研究的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。