日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

利用zookeeper实现发布订阅模式

發布時間:2025/3/14 编程问答 12 豆豆
生活随笔 收集整理的這篇文章主要介紹了 利用zookeeper实现发布订阅模式 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

zookeeper應用

發布訂閱

zk實現的方式是推拉結合,Client想服務端注冊自己需要關注的節點,一旦節點的數據發生變更,那么Server會向對應的客戶端發送Watcher事件通知,客戶端接收到這個消息后,需要主動到服務端獲取最新的數據。

目前很多應用使用發布訂閱都不是用zk的這種方式,比較典型的純的推模式和拉模式,這個之前有記錄過Notify和MetaQ的比較,不是本篇的重點。本次主要是利用zookeeper來實現以下發布訂閱這種功能。

搭建了一個zk環境,手動創建了一個節點/publish,客戶端發布者代碼如下:

package com.wpr.zk.pulish;import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat;import java.io.IOException; import java.util.concurrent.CountDownLatch;/*** 利用zk來模擬發布訂閱模式* Created by peirong.wpr on 2017/4/5.*/ public class Publish implements Watcher{private static CountDownLatch latch = new CountDownLatch(1);private static Stat stat = new Stat();private static ZooKeeper zk =null;private final static Integer SESSION_TIMEOUT = 5000;public static void main(String[] args) {try {String path ="/publish";zk = new ZooKeeper("192.168.109.130:2181",SESSION_TIMEOUT,new Publish());latch.await();System.out.println("zk connection");byte[] temp = zk.getData(path,true,stat);System.out.println("init data :pulish node data"+new String(temp));int i=0;while(true){System.out.println( "publish new Data:"+i);zk.setData(path,String.valueOf(i).getBytes(),-1);Thread.sleep(5000L);i++;}} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}public void process(WatchedEvent event) {if(Event.KeeperState.SyncConnected == event.getState()){System.out.println("receive watched event:"+event);System.out.println(event.getState());latch.countDown();}} }

訂閱者代碼如下:

package com.wpr.zk.pulish;import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat;import java.io.IOException; import java.util.concurrent.CountDownLatch;/*** Created by peirong.wpr on 2017/4/5.*/ public class Subscribe implements Watcher {private static CountDownLatch latch = new CountDownLatch(1);private static Stat stat = new Stat();private static ZooKeeper zk =null;private final static Integer SESSION_TIMEOUT = 5000;public static void main(String[] args) {try {String path ="/publish";zk = new ZooKeeper("192.168.109.130:2181",SESSION_TIMEOUT,new Subscribe());latch.await();System.out.println("zk connection");byte[] temp = zk.getData(path,true,stat);System.out.println("init data :pulish node data"+new String(temp));int i=0;while(true){Thread.sleep(Integer.MAX_VALUE);}} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}public void process(WatchedEvent event) {if(Event.KeeperState.SyncConnected == event.getState()){if(Event.EventType.None == event.getType() && event.getPath() == null){latch.countDown();}else if(event.getType() == Event.EventType.NodeDataChanged){//Clinet需要去拉取最新的數據信息try {byte[] newByte = zk.getData(event.getPath(),true,stat);System.out.println("path:"+event.getPath()+"\tdata has changed.\t new Data :"+ new String(newByte));} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}} }

轉載于:https://www.cnblogs.com/kakaxisir/p/6667279.html

與50位技術專家面對面20年技術見證,附贈技術全景圖

總結

以上是生活随笔為你收集整理的利用zookeeper实现发布订阅模式的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。