日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMq--4--集群(转载)

發(fā)布時間:2024/9/15 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMq--4--集群(转载) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

RabbitMQ消息服務用戶手冊

1?基礎知識

1.1?集群總體概述

Rabbitmq?Broker集群是多個erlang節(jié)點的邏輯組,每個節(jié)點運行Rabbitmq應用,他們之間共享用戶、虛擬主機、隊列、exchange、綁定和運行時參數(shù)。

1.2?集群復制信息

除了message queue(存在一個節(jié)點,從其他節(jié)點都可見、訪問該隊列,要實現(xiàn)queue的復制就需要做queue的HA)之外,任何一個Rabbitmq broker上的所有操作的data和state都會在所有的節(jié)點之間進行復制。

1.3?集群運行前提

1、集群所有節(jié)點必須運行相同的erlang及Rabbitmq版本。

2、hostname解析,節(jié)點之間通過域名相互通信,本文為3個node的集群,采用配置hosts的形式。

1.4?集群互通方式

1、集群所有節(jié)點必須運行相同的erlang及Rabbitmq版本hostname解析,節(jié)點之間通過域名相互通信,本文為3個node的集群,采用配置hosts的形式。

1.5?端口及其用途

1、5672 客戶端連接端口。

2、15672 web管控臺端口。

3、25672 集群通信端口。

1.6?集群配置方式

通過rabbitmqctl手工配置的方式。

1.7?集群故障處理

1、rabbitmq broker集群允許個體節(jié)點宕機。

2、對應集群的的網(wǎng)絡分區(qū)問題(network partitions)集群推薦用于LAN環(huán)境,不適用WAN環(huán)境;要通過WAN連接broker,Shovel or Federation插件是最佳解決方案(Shovel or Federation不同于集群:注Shovel為中心服務遠程異步復制機制,稍后會有介紹)。

1.8?節(jié)點運行模式

為保證數(shù)據(jù)持久性,目前所有node節(jié)點跑在disk模式,如果今后壓力大,需要提高性能,考慮采用ram模式。

1.9?集群認證方式

通過Erlang Cookie,相當于共享秘鑰的概念,長度任意,只要所有節(jié)點都一致即可。rabbitmq server在啟動的時候,erlang VM會自動創(chuàng)建一個隨機的cookie文件。cookie文件的位置: /var/lib/rabbitmq/.erlang.cookie 或者/root/.erlang.cookie。我們的為保證cookie的完全一致,采用從一個節(jié)點copy的方式,實現(xiàn)各個節(jié)點的cookie文件一致。

2?集群搭建

2.1?集群節(jié)點安裝

1、安裝依賴包

PS:安裝rabbitmq所需要的依賴包

yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

?

2、下載安裝包

wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpmwget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpmwget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm

3、安裝服務命令?

rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpmrpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpmrpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

4、修改集群用戶與連接心跳檢測?

注意修改vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app文件修改:loopback_users 中的 <<"guest">>,只保留guest修改:heartbeat 為1

5、安裝管理插件?

//首先啟動服務/etc/init.d/rabbitmq-server start stop status restart//查看服務有沒有啟動: lsof -i:5672rabbitmq-plugins enable rabbitmq_management//可查看管理端口有沒有啟動: lsof -i:15672 或者 netstat -tnlp|grep 15672

6、服務指令?

/etc/init.d/rabbitmq-server start stop status restart驗證單個節(jié)點是否安裝成功:http://192.168.11.71:15672/Ps:以上操作三個節(jié)點(71、72、73)同時進行操作


PS:選擇76、77、78任意一個節(jié)點為Master(這里選擇76為Master),也就是說我們需要把76的Cookie文件同步到77、78節(jié)點上去,進入/var/lib/rabbitmq目錄下,把/var/lib/rabbitmq/.erlang.cookie文件的權限修改為777,原來是400;然后把.erlang.cookie文件copy到各個節(jié)點下;最后把所有cookie文件權限還原為400即可。

2.2?文件同步步驟

/etc/init.d/rabbitmq-server stop //進入目錄修改權限;遠程copy77、78節(jié)點,比如:scp /var/lib/rabbitmq/.erlang.cookie 到192.168.11.77和192.168.11.78中

?

2.3?組成集群步驟?

1、停止MQ服務

PS:我們首先停止3個節(jié)點的服務

rabbitmqctl stop

PS:接下來我們就可以使用集群命令,配置76、77、78為集群模式,3個節(jié)點(76、77、78)執(zhí)行啟動命令,后續(xù)啟動集群使用此命令即可。

rabbitmq-server -detached

2、組成集群操作

//注意做這個步驟的時候:需要配置/etc/hosts 必須相互能夠?qū)ぶ返絙hz77:rabbitmqctl stop_appbhz77:rabbitmqctl join_cluster --ram rabbit@bhz76bhz77:rabbitmqctl start_appbhz78:rabbitmqctl stop_appbhz78:rabbitmqctl join_cluster rabbit@bhz76bhz78:rabbitmqctl start_app//在另外其他節(jié)點上操作要移除的集群節(jié)點rabbitmqctl forget_cluster_node rabbit@bhz24

3、slave加入集群操作(重新加入集群也是如此,以最開始的主節(jié)點為加入節(jié)點)

4、修改集群名稱

PS:修改集群名稱(默認為第一個node名稱):

rabbitmqctl set_cluster_name rabbitmq_cluster1

5、查看集群狀態(tài)?

PS:最后在集群的任意一個節(jié)點執(zhí)行命令:查看集群狀態(tài)

rabbitmqctl cluster_status

?

?

6、管控臺界面

PS:?訪問任意一個管控臺節(jié)點:http://192.168.11.71:15672?如圖所示

?

?

?

2.4?配置鏡像隊列

PS:設置鏡像隊列策略(在任意一個節(jié)點上執(zhí)行)

rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

?PS:將所有隊列設置為鏡像隊列,即隊列會被復制到各個節(jié)點,各個節(jié)點狀態(tài)一致,RabbitMQ高可用集群就已經(jīng)搭建好了,我們可以重啟服務,查看其隊列是否在從節(jié)點同步。

2.5?安裝Ha-Proxy?

1、Haproxy簡介

HAProxy是一款提供高可用性、負載均衡以及基于TCP和HTTP應用的代理軟件,HAProxy是完全免費的、借助HAProxy可以快速并且可靠的提供基于TCP和HTTP應用的代理解決方案。

HAProxy適用于那些負載較大的web站點,這些站點通常又需要會話保持或七層處理。

HAProxy可以支持數(shù)以萬計的并發(fā)連接,并且HAProxy的運行模式使得它可以很簡單安全的整合進架構中,同時可以保護web服務器不被暴露到網(wǎng)絡上。

2、Haproxy安裝

PS:79、80節(jié)點同時安裝Haproxy,下面步驟統(tǒng)一

//下載依賴包yum install gcc vim wget//下載haproxywget http://www.haproxy.org/download/1.6/src/haproxy-1.6.5.tar.gz//解壓tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local//進入目錄、進行編譯、安裝cd /usr/local/haproxy-1.6.5make TARGET=linux31 PREFIX=/usr/local/haproxymake install PREFIX=/usr/local/haproxymkdir /etc/haproxy//賦權groupadd -r -g 149 haproxyuseradd -g haproxy -r -s /sbin/nologin -u 149 haproxy//創(chuàng)建haproxy配置文件touch /etc/haproxy/haproxy.cfg

?


3、Haproxy配置?

PS:haproxy?配置文件haproxy.cfg詳解

vim /etc/haproxy/haproxy.cfg

?

#logging optionsgloballog 127.0.0.1 local0 infomaxconn 5120chroot /usr/local/haproxyuid 99gid 99daemonquietnbproc 20pidfile /var/run/haproxy.piddefaultslog global#使用4層代理模式,”mode http”為7層代理模式mode tcp#if you set mode to tcp,then you nust change tcplog into httplogoption tcplogoption dontlognullretries 3option redispatchmaxconn 2000contimeout 5s##客戶端空閑超時時間為 60秒 則HA 發(fā)起重連機制clitimeout 60s##服務器端鏈接超時時間為 15秒 則HA 發(fā)起重連機制srvtimeout 15s#front-end IP for consumers and producterslisten rabbitmq_clusterbind 0.0.0.0:5672#配置TCP模式mode tcp#balance url_param userid#balance url_param session_id check_post 64#balance hdr(User-Agent)#balance hdr(host)#balance hdr(Host) use_domain_only#balance rdp-cookie#balance leastconn#balance source //ip#簡單的輪詢balance roundrobin#rabbitmq集群節(jié)點配置 #inter 每隔五秒對mq集群做健康檢查, 2次正確證明服務器可用,2次失敗證明服務器不可用,并且配置主備機制server bhz76 192.168.11.76:5672 check inter 5000 rise 2 fall 2server bhz77 192.168.11.77:5672 check inter 5000 rise 2 fall 2server bhz78 192.168.11.78:5672 check inter 5000 rise 2 fall 2#配置haproxy web監(jiān)控,查看統(tǒng)計信息listen statsbind 192.168.11.79:8100mode httpoption httplogstats enable#設置haproxy監(jiān)控地址為http://localhost:8100/rabbitmq-statsstats uri /rabbitmq-statsstats refresh 5s

4、啟動haproxy

/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg//查看haproxy進程狀態(tài)ps -ef | grep haproxy

5、訪問haproxy?

PS:訪問如下地址可以對rmq節(jié)點進行監(jiān)控:http://192.168.1.27:8100/rabbitmq-stats

?

?

?

6、關閉haproxy

killall haproxyps -ef | grep haproxy

2.6?安裝KeepAlived?

?

1、Keepalived簡介

Keepalived,它是一個高性能的服務器高可用或熱備解決方案,Keepalived主要來防止服務器單點故障的發(fā)生問題,可以通過其與Nginx、Haproxy等反向代理的負載均衡服務器配合實現(xiàn)web服務端的高可用。Keepalived以VRRP協(xié)議為實現(xiàn)基礎,用VRRP協(xié)議來實現(xiàn)高可用性(HA).VRRP(Virtual Router Redundancy Protocol)協(xié)議是用于實現(xiàn)路由器冗余的協(xié)議,VRRP協(xié)議將兩臺或多臺路由器設備虛擬成一個設備,對外提供虛擬路由器IP(一個或多個)。

?

2、Keepalived安裝

PS:下載地址:http://www.keepalived.org/download.html

//安裝所需軟件包yum install -y openssl openssl-devel//下載wget http://www.keepalived.org/software/keepalived-1.2.18.tar.gz//解壓、編譯、安裝tar -zxvf keepalived-1.2.18.tar.gz -C /usr/local/cd keepalived-1.2.18/ && ./configure --prefix=/usr/local/keepalivedmake && make install//將keepalived安裝成Linux系統(tǒng)服務,因為沒有使用keepalived的默認安裝路徑(默認路徑:/usr/local),安裝完成之后,需要做一些修改工作//首先創(chuàng)建文件夾,將keepalived配置文件進行復制:mkdir /etc/keepalivedcp /usr/local/keepalived/etc/keepalived/keepalived.conf /etc/keepalived///然后復制keepalived腳本文件:cp /usr/local/keepalived/etc/rc.d/init.d/keepalived /etc/init.d/cp /usr/local/keepalived/etc/sysconfig/keepalived /etc/sysconfig/ln -s /usr/local/sbin/keepalived /usr/sbin/ln -s /usr/local/keepalived/sbin/keepalived /sbin///可以設置開機啟動:chkconfig keepalived on,到此我們安裝完畢!chkconfig keepalived on

?

3、Keepalived配置

PS:修改keepalived.conf配置文件

vim /etc/keepalived/keepalived.conf

PS:?79節(jié)點(Master)配置如下

?

! Configuration File for keepalivedglobal_defs {router_id bhz79 ##標識節(jié)點的字符串,通常為hostname}vrrp_script chk_haproxy {script "/etc/keepalived/haproxy_check.sh" ##執(zhí)行腳本位置interval 2 ##檢測時間間隔weight -20 ##如果條件成立則權重減20}vrrp_instance VI_1 {state MASTER ## 主節(jié)點為MASTER,備份節(jié)點為BACKUPinterface eth0 ## 綁定虛擬IP的網(wǎng)絡接口(網(wǎng)卡),與本機IP地址所在的網(wǎng)絡接口相同(我這里是eth0)virtual_router_id 79 ## 虛擬路由ID號(主備節(jié)點一定要相同)mcast_src_ip 192.168.11.79 ## 本機ip地址priority 100 ##優(yōu)先級配置(0-254的值)nopreemptadvert_int 1 ## 組播信息發(fā)送間隔,倆個節(jié)點必須配置一致,默認1sauthentication { ## 認證匹配auth_type PASSauth_pass bhz}track_script {chk_haproxy}virtual_ipaddress {192.168.11.70 ## 虛擬ip,可以指定多個}}

PS:?80節(jié)點(backup)配置如下

?

! Configuration File for keepalivedglobal_defs {router_id bhz80 ##標識節(jié)點的字符串,通常為hostname}vrrp_script chk_haproxy {script "/etc/keepalived/haproxy_check.sh" ##執(zhí)行腳本位置interval 2 ##檢測時間間隔weight -20 ##如果條件成立則權重減20}vrrp_instance VI_1 {state BACKUP ## 主節(jié)點為MASTER,備份節(jié)點為BACKUPinterface eno16777736 ## 綁定虛擬IP的網(wǎng)絡接口(網(wǎng)卡),與本機IP地址所在的網(wǎng)絡接口相同(我這里是eno16777736)virtual_router_id 79 ## 虛擬路由ID號(主備節(jié)點一定要相同)mcast_src_ip 192.168.11.80 ## 本機ip地址priority 90 ##優(yōu)先級配置(0-254的值)nopreemptadvert_int 1 ## 組播信息發(fā)送間隔,倆個節(jié)點必須配置一致,默認1sauthentication { ## 認證匹配auth_type PASSauth_pass bhz}track_script {chk_haproxy}virtual_ipaddress {192.168.1.70 ## 虛擬ip,可以指定多個}}

?

4、執(zhí)行腳本編寫

PS:添加文件位置為/etc/keepalived/haproxy_check.sh(79、80兩個節(jié)點文件內(nèi)容一致即可)

#!/bin/bashCOUNT=`ps -C haproxy --no-header |wc -l`if [ $COUNT -eq 0 ];then/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfgsleep 2if [ `ps -C haproxy --no-header |wc -l` -eq 0 ];thenkillall keepalivedfifi

?

5、執(zhí)行腳本賦權?

PS:haproxy_check.sh腳本授權,賦予可執(zhí)行權限.

chmod +x /etc/keepalived/haproxy_check.sh

6、啟動keepalived?

PS:當我們啟動倆個haproxy節(jié)點以后,我們可以啟動keepalived服務程序:

//啟動兩臺機器的keepalivedservice keepalived start | stop | status | restart//查看狀態(tài)ps -ef | grep haproxyps -ef | grep keepalived

?

7、高可用測試?

PS:vip在27節(jié)點上

?

?

?

PS:27節(jié)點宕機測試:停掉27的keepalived服務即可。

?

?

?

PS:查看28節(jié)點狀態(tài):我們發(fā)現(xiàn)VIP漂移到了28節(jié)點上,那么28節(jié)點的haproxy可以繼續(xù)對外提供服務!

?

?

?

2.7?集群配置文件

創(chuàng)建如下配置文件位于:/etc/rabbitmq目錄下(這個目錄需要自己創(chuàng)建)

環(huán)境變量配置文件:rabbitmq-env.conf

配置信息配置文件:rabbitmq.config(可以不創(chuàng)建和配置,修改)

rabbitmq-env.conf配置文件:

---------------------------------------關鍵參數(shù)配置-------------------------------------------

RABBITMQ_NODE_IP_ADDRESS=本機IP地址

RABBITMQ_NODE_PORT=5672

RABBITMQ_LOG_BASE=/var/lib/rabbitmq/log

RABBITMQ_MNESIA_BASE=/var/lib/rabbitmq/mnesia

?

配置參考參數(shù)如下:

RABBITMQ_NODENAME=FZTEC-240088 節(jié)點名稱

RABBITMQ_NODE_IP_ADDRESS=127.0.0.1 監(jiān)聽IP

RABBITMQ_NODE_PORT=5672 監(jiān)聽端口

RABBITMQ_LOG_BASE=/data/rabbitmq/log 日志目錄

RABBITMQ_PLUGINS_DIR=/data/rabbitmq/plugins 插件目錄

RABBITMQ_MNESIA_BASE=/data/rabbitmq/mnesia 后端存儲目錄

更詳細的配置參見: http://www.rabbitmq.com/configure.html#configuration-file

?

配置文件信息修改:

/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.4/ebin/rabbit.app和rabbitmq.config配置文件配置任意一個即可,我們進行配置如下:

vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.4/ebin/rabbit.app

-------------------------------------關鍵參數(shù)配置----------------------------------------

tcp_listerners 設置rabbimq的監(jiān)聽端口,默認為[5672]。
disk_free_limit 磁盤低水位線,若磁盤容量低于指定值則停止接收數(shù)據(jù),默認值為{mem_relative, 1.0},即與內(nèi)存相關聯(lián)1:1,也可定制為多少byte.
vm_memory_high_watermark,設置內(nèi)存低水位線,若低于該水位線,則開啟流控機制,默認值是0.4,即內(nèi)存總量的40%。
hipe_compile 將部分rabbimq代碼用High Performance Erlang compiler編譯,可提升性能,該參數(shù)是實驗性,若出現(xiàn)erlang vm segfaults,應關掉。
force_fine_statistics, 該參數(shù)屬于rabbimq_management,若為true則進行精細化的統(tǒng)計,但會影響性能

------------------------------------------------------------------------------------------

更詳細的配置參見:http://www.rabbitmq.com/configure.html

?

3?Stream調(diào)研

3.1?Stream簡介

Spring Cloud Stream是創(chuàng)建消息驅(qū)動微服務應用的框架。Spring Cloud Stream是基于spring boot創(chuàng)建,用來建立單獨的/工業(yè)級spring應用,使用spring integration提供與消息代理之間的連接。

本文提供不同代理中的中間件配置,介紹了持久化發(fā)布訂閱機制,以及消費組以及分割的概念。
將注解@EnableBinding加到應用上就可以實現(xiàn)與消息代理的連接,@StreamListener注解加到方法上,使之可以接收處理流的事件。

?

3.2?官方參考文檔

原版:

http://docs.spring.io/spring-cloud-stream/docs/current-SNAPSHOT/reference/htmlsingle/#_main_concepts

翻譯:

http://blog.csdn.net/phyllisy/article/details/51352868

3.3?API操作手冊

3.3.1?生產(chǎn)者示例

PS:生產(chǎn)者yml配置

spring: cloud: stream:instanceCount: 3 bindings:output_channel: #輸出 生產(chǎn)者 group: queue-1 #指定相同的exchange-1和不同的queue 表示廣播模式 #指定相同的exchange和相同的queue表示集群負載均衡模式destination: exchange-1 # kafka:發(fā)布訂閱模型里面的topic rabbitmq: exchange的概念(但是exchange的類型那里設置呢?) binder: rabbit_clusterbinders: rabbit_cluster: type: rabbit environment: spring: rabbitmq: host: 192.168.1.27 port: 5672 username: guestpassword: guestvirtual-host: /

PS:?Barista接口為自定義管道?

package bhz.spring.cloud.stream;import org.springframework.cloud.stream.annotation.Input;import org.springframework.cloud.stream.annotation.Output;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.SubscribableChannel;/*** <B>中文類名:</B><BR>* <B>概要說明:</B><BR>* 這里的Barista接口是定義來作為后面類的參數(shù),這一接口定義來通道類型和通道名稱。* 通道名稱是作為配置用,通道類型則決定了app會使用這一通道進行發(fā)送消息還是從中接收消息。* @author bhz(Alienware)* @since 2015年11月22日*/public interface Barista {String INPUT_CHANNEL = "input_channel"; String OUTPUT_CHANNEL = "output_channel"; //注解@Input聲明了它是一個輸入類型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。這一名字與上述配置app2的配置文件中position1應該一致,表明注入了一個名字叫做input_channel的通道,它的類型是input,訂閱的主題是position2處聲明的mydest這個主題 @Input(Barista.INPUT_CHANNEL) SubscribableChannel loginput(); //注解@Output聲明了它是一個輸出類型的通道,名字是output_channel。這一名字與app1中通道名一致,表明注入了一個名字為output_channel的通道,類型是output,發(fā)布的主題名為mydest。 @Output(Barista.OUTPUT_CHANNEL)MessageChannel logoutput(); }

?


PS:?生產(chǎn)者消息投遞?

package bhz.spring.cloud.stream;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Service;@Service public class RabbitmqSender { @Autowired private Barista source; // 發(fā)送消息 public String sendMessage(Object message){ try{ source.logoutput().send(MessageBuilder.withPayload(message).build());System.out.println("發(fā)送數(shù)據(jù):" + message);}catch (Exception e){ e.printStackTrace(); } return null; } }

PS:?Spring Boot應用入口?

package bhz.spring.cloud.stream;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.cloud.stream.annotation.EnableBinding;@SpringBootApplication @EnableBinding(Barista.class) public class ProducerApplication { public static void main(String[] args) { SpringApplication.run(ProducerApplication.class, args); } }

?


3.3.2?消費者示例?

PS:消費者yml配置

spring: cloud: stream:instanceCount: 3bindings: input_channel: #輸出 生產(chǎn)者 destination: exchange-1 # kafka:發(fā)布訂閱模型里面的topic rabbitmq: exchange的概念(但是exchange的類型那里設置呢?) group: queue-1 #指定相同的exchange-1和不同的queue 表示廣播模式 #指定相同的exchange和相同的queue表示集群負載均衡模式binder: rabbit_clusterconsumer: concurrency: 1rabbit: bindings:input_channel: consumer: transacted: truetxSize: 10acknowledgeMode: MANUALdurableSubscription: truemaxConcurrency: 20recoveryInterval: 3000 binders: rabbit_cluster: type: rabbit environment: spring: rabbitmq: host: 192.168.1.27 port: 5672 username: guestpassword: guestvirtual-host: /

?

?

PS:?Barista接口為自定義管道

package bhz.spring.cloud.stream;import org.springframework.cloud.stream.annotation.Input;import org.springframework.cloud.stream.annotation.Output;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.SubscribableChannel;/*** <B>中文類名:</B><BR>* <B>概要說明:</B><BR>* 這里的Barista接口是定義來作為后面類的參數(shù),這一接口定義來通道類型和通道名稱。* 通道名稱是作為配置用,通道類型則決定了app會使用這一通道進行發(fā)送消息還是從中接收消息。* @author bhz(Alienware)* @since 2015年11月22日*/public interface Barista {String INPUT_CHANNEL = "input_channel"; String OUTPUT_CHANNEL = "output_channel"; //注解@Input聲明了它是一個輸入類型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。這一名字與上述配置app2的配置文件中position1應該一致,表明注入了一個名字叫做input_channel的通道,它的類型是input,訂閱的主題是position2處聲明的mydest這個主題 @Input(Barista.INPUT_CHANNEL) SubscribableChannel loginput(); //注解@Output聲明了它是一個輸出類型的通道,名字是output_channel。這一名字與app1中通道名一致,表明注入了一個名字為output_channel的通道,類型是output,發(fā)布的主題名為mydest。 @Output(Barista.OUTPUT_CHANNEL)MessageChannel logoutput(); }

?


PS:?消費者消息獲取?

package bhz.spring.cloud.stream;import java.io.IOException;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.support.CorrelationData;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.annotation.StreamListener;import org.springframework.cloud.stream.binding.ChannelBindingService;import org.springframework.cloud.stream.config.ChannelBindingServiceConfiguration;import org.springframework.cloud.stream.endpoint.ChannelsEndpoint;import org.springframework.integration.channel.PublishSubscribeChannel;import org.springframework.integration.channel.RendezvousChannel;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.SubscribableChannel;import org.springframework.messaging.core.MessageReceivingOperations;import org.springframework.messaging.core.MessageRequestReplyOperations;import org.springframework.messaging.support.ChannelInterceptor;import org.springframework.stereotype.Service;import com.rabbitmq.client.Channel;@EnableBinding(Barista.class)@Servicepublic class RabbitmqReceiver { @Autowired private Barista source; @StreamListener(Barista.INPUT_CHANNEL) public void receiver( Message message) { //廣播通道//PublishSubscribeChannel psc = new PublishSubscribeChannel();//確認通道//RendezvousChannel rc = new RendezvousChannel();Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);System.out.println("Input Stream 1 接受數(shù)據(jù):" + message);try {channel.basicAck(deliveryTag, false);} catch (IOException e) {e.printStackTrace();}} }

?

PS: Spring Boot應用入口?

package bhz.spring.cloud.stream;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.transaction.annotation.EnableTransactionManagement;@SpringBootApplication @EnableBinding(Barista.class)@EnableTransactionManagementpublic class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } }

?

4.1?延遲隊列插件4?制定擴展

#step1:upload?the?‘rabbitmq_delayed_message_exchange-0.0.1.ez’ file:?

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

http://www.rabbitmq.com/community-plugins.html

https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange/v3.6.x#files/

?

#step2:PUT Directory:

/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.4/plugins

#step3:Then run the following command:

Start the rabbitmq cluster for command ## rabbitmq-server -detached

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

?

?

?

訪問地址:http://192.168.1.21:15672/#/exchanges,添加延遲隊列

?

?

總結

以上是生活随笔為你收集整理的RabbitMq--4--集群(转载)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。