日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ集群、镜像部署配置

發布時間:2023/12/31 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ集群、镜像部署配置 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

2019獨角獸企業重金招聘Python工程師標準>>>

?

1?? RABBITMQ簡介及安裝

?

RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

AMQP,即Advanced message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。消息中間件主要用于組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。

1?? RABBITMQ系統架構

?

RabbitMQ Server: 也叫broker server,是一種傳輸服務,負責維護一條從Producer到consumer的路線,保證數據能夠按照指定的方式進行傳輸。

Producer,數據的發送方。

Consumer,數據的接收方。

Exchanges 接收消息,轉發消息到綁定的隊列。主要使用3種類型:direct, topic, fanout。

Queue RabbitMQ內部存儲消息的對象。相同屬性的queue可以重復定義,但只有第一次定義的有效。

Bindings 綁定Exchanges和Queue之間的路由。

Connection: 就是一個TCP的連接。Producer和consumer都是通過TCP連接到RabbitMQ Server的。

Channel:虛擬連接。它建立在上述的TCP連接中。數據流動都是在Channel中進行的。也就是說,一般情況是程序起始建立TCP連接,第二步就是建立這個Channel。

2?? RABBITMQ安裝和啟動

Ubuntu 安裝

編輯?/etc/apt/sources.list 文件添加如下行

deb http://www.rabbitmq.com/debian/ testing main

執行更新軟件源命令

#apt-get update.

安裝rabbitmq

#apt-get install rabbitmq-server

啟動rabbitmq

# service rabbitmq-server start

?

Centos 安裝

從官網下載erlang和rabbitmq 的rpm包

安裝erlang

#rpm –i erlang-17.4-1.el6.x86_64.rpm

安裝rabbitmq

#rpm –i rabbitmq-server-3.5.3-1.noarch.rpm

rabbitmq的rpm安裝包不能指定安裝路徑

3?? RABBITMQ配置文件

RABBITMQ配置文件位置:

  • Generic UNIX-?$RABBITMQ_HOME/etc/rabbitmq/
  • Debian-?/etc/rabbitmq/
  • RPM-?/etc/rabbitmq/

?

4?? RABBITMQ頁面(web)管理

啟用web插件

#rabbitmq-plugins enable rabbitmq_management #啟用

關閉web插件

# rabbitmq-plugins disable rabbitmq_management

可以用默認賬號guest,guest登陸http://主機IP:15672,如果要遠程登錄,需要先創建帳戶,可查看下一節。

5?? RABBITMQ創建帳戶

如果是集群的話,只要在一臺主機設置即可,其它會自動同步。

#rabbitmqctl?add_user?iom 123456? –iom為新建的用戶,123456為密碼

#rabbitmqctl? set_user_tags?iom administrator –將用戶設置為管理員角色

#rabbitmqctl?set_permissions?-p?/?iom “.*”?“.*”?“.*”

–在 / 虛擬主機里設置iom用戶配置權限,寫權限,讀權限。.*是正則表達式里用法。rabbitmq的權限是根據不同的虛擬主機(virtual hosts)配置的,同用戶在不同的虛擬主機(virtual hosts)里可能不一樣。

2?? RABBITMQ安全特性

6?? publish消息確認機制

如果采用標準的 AMQP 協議,則唯一能夠保證消息不會丟失的方式是利用事務機制 — 令 channel 處于 transactional 模式、向其 publish 消息、執行 commit 動作。在這種方式下,事務機制會帶來大量的多余開銷,并會導致吞吐量下降 250% 。為了補救事務帶來的問題,引入了 confirmation 機制(即 Publisher Confirm)。

confirm 機制是在channel上使用 confirm.select方法,處于 transactional 模式的 channel 不能再被設置成 confirm 模式,反之亦然。

在 channel 被設置成 confirm 模式之后,所有被 publish 的后續消息都將被 confirm(即 ack) 或者被 nack 一次。但是沒有對消息被 confirm 的快慢做任何保證,并且同一條消息不會既被 confirm 又被 nack 。

RabbitMQ 將在下面的情況中對消息進行 confirm :

RabbitMQ發現當前消息無法被路由到指定的 queues 中;

非持久屬性的消息到達了其所應該到達的所有 queue 中(和鏡像 queue 中);

持久消息到達了其所應該到達的所有 queue 中(和鏡像 queue 中),并被持久化到了磁盤(被 fsync);

持久消息從其所在的所有 queue 中被 consume 了(如果必要則會被 acknowledge)。

?

7?? consumer消息確認機制

為了保證數據不被丟失,RabbitMQ支持消息確認機制,即acknowledgments。

如果沒啟動消息確認機制,RabbitMQ在consumer收到消息后就會把消息刪除。

啟用消息確認后,consumer在處理數據后應通過回調函數顯示發送ack, RabbitMQ收到ack后才會刪掉數據。如果consumer一段時間內不回饋,RabbitMQ會將該消息重新分配給另外一個綁定在該隊列上的consumer。另一種情況是consumer斷開連接,但是獲取到的消息沒有回饋,則RabbitMQ同樣重新分配。

注意:如果consumer 沒調用basic.qos 方法設置prefetch_count=1,那即使該consumer有未ack的messages,RabbitMQ仍會繼續發messages給它。

8?? 消息持久化

消息確認機制確保了consumer退出時消息不會丟失,但如果是RabbitMQ本身因故障退出,消息還是會丟失。為了保證在RabbitMQ出現意外情況時數據仍沒有丟失,需要將queue和message都要持久化。

queue持久化:channel.queue_declare(queue=’hello’, durable=True)

message持久化:channel.basic_publish(exchange=”,

routing_key=”task_queue”,

body=message,

properties=pika.BasicProperties(

delivery_mode = 2,)? #消息持久化

)

即使有消息持久化,數據也有可能丟失,因為rabbitmq是先將數據緩存起來,到一定條件才保存到硬盤上,這期間rabbitmq出現意外數據有可能丟失。

網上有測試表明:持久化會對RabbitMQ的性能造成比較大的影響,可能會下降10倍不止。

3?? RABBITMQ集群

1?? RABBITMQ集群基本概念

一個RABBITMQ集 群中可以共享user,virtualhosts,queues(開啟Highly Available Queues),exchanges等。但message只會在創建的節點上傳輸。當message進入A節點的queue中后,consumer從B節點拉取時,RabbitMQ會臨時在A、B間進行消息傳輸,把A中的消息實體取出并經過B發送給consumer。所以consumer應盡量連接每一個節點,從中取消息。

RABBITMQ的集群節點包括內存節點、磁盤節點。內存節點的元數據僅放在內存中,性能比磁盤節點會有所提升。不過,如果在投遞message時,打開了message的持久化,那么內存節點的性能只能體現在資源管理上,比如增加或刪除隊列(queue),虛擬主機(vrtual hosts),交換機(exchange)等,發送和接受message速度同磁盤節點一樣。一個集群至少要有一個磁盤節點。

2?? RABBITMQ搭建集群

環境:有三臺主機,主機名和IP如下,rabbitmq的執行用戶為rabbitmq,所屬組為rabbitmq。

主機名??? ??? ?IP

rabbitmq1 192.168.10.2

rabbitmq2 192.168.10.3

rabbitmq3? 192.168.10.4

同步erlang.cookie

殺掉rabbitmq2和rabbitmq3的rabbitmq進程:

#ps –ef|grep rab|awk ‘{print $2}’|xargs kill -9。–用service rabbitmq-servier stop停會有遺留進程。

登陸rabbitmq1(rabbitmq1上的rabbitmq服務不能關),執行

#cd /var/lib/rabbitmq??? ?–進入erlang.cookie所在目錄,只有ls –al能看見此文件

#chmod 777 .erlang* ???? ?–該文件默認為400權限,為方便傳輸,先修改權限,非必須操作

#scp .erlang.cookie??rabbitmq@192.168.10.3:/var/lib/rabbitmq?–將此文件傳給另外兩條主機

#scp .erlang.cookie??rabbitmq@192.168.10.4:/var/lib/rabbitmq

#chmod 400 .er*????????? –恢復文件權限

分別在rabbitmq2和rabbitmq3 上執行

#chown rabbitmq:rabbitmq .er*? –修改文件所屬用戶和所屬組

#chmod 400 .er*? –修改文件權限

#service rabbitmq-server start

加入集群

查詢rabbitmq1節點名稱

#rabbitmqctl cluster_status

Cluster status of node rabbit@rabbitmq1 …

[{nodes,[{disc,[rabbit@rabbitmq1]}]},{running_nodes,[rabbit@ rabbitmq1]}]

…done.

rabbitmq2 加入rabbitmq1 節點.

# rabbitmqctl stop_app?? –關掉rabbitmq2服務

# rabbitmqctl join_cluster rabbit@rabbitmq1 — rabbitmq2加入rabbitmq1, rabbitmq2必須能通過rabbitmq1的主機名ping通rabbitmq1。

# rabbitmqctl start_app? –啟動rabbitmq2服務

查看集群信息

# rabbitmqctl cluster_status –此時里面就應該能看見兩個節點。集群名字為rabbit@rabbitmq。

用相同的方法把rabbitmq3也加入rabbitmq1。

更改節點屬性

#rabbitmqctl stop_app? –停止rabbitmq服務

#rabbitmqctl change_cluster_node_type disc/ram –更改節點為磁盤或內存節點

#rabbitmqctl start_app –開啟rabbitmq服務

?

查看集群狀態

#rabbitmqctl cluster_status

[{nodes,[{disc,[rabbit@rabbitmq1,rabbit@rabbitmq2,rabbit@rabbitmq3]}]}, {running_nodes,[rabbit@rabbitmq1,rabbit@rabbitmq2, rabbit@rabbitmq3]}]…done.

?

–第一行是集群中的節點成員,disc表示這些都是磁盤節點。

–第二行是正在運行的節點成員

3?? RABBITMQ退出集群

假設要把rabbitmq2退出集群

在rabbitmq2上執行

#rabbitmqctl stop_app

#rabbitmqctl reset

#rabbitmqctl start_app

?

在集群主節點上執行

# rabbitmqctl forget_cluster_node rabbit@rabbitmq2

4?? RABBITMQ集群重啟

集群重啟時,最后一個掛掉的節點應該第一個重啟,如果因特殊原因(比如同時斷電),而不知道哪個節點最后一個掛掉。可用以下方法重啟:

先在一個節點上執行

#rabbitmqctl force_boot

#service rabbitmq-server start

在其他節點上執行

#service rabbitmq-server start

查看cluster狀態是否正常(要在所有節點上查詢)。

#rabbitmqctl cluster_status

?

如果有節點沒加入集群,可以先退出集群,然后再重新加入集群。

上述方法不適合內存節點重啟,內存節點重啟的時候是會去磁盤節點同步數據,如果磁盤節點沒起來,內存節點一直失敗。

5?? 注意事項

  • cookie在所有節點上必須完全一樣,同步時一定要注意。
  • erlang是通過主機名來連接服務,必須保證各個主機名之間可以ping通。可以通過編輯/etc/hosts來手工添加主機名和IP對應關系。如果主機名ping不通,rabbitmq服務啟動會失敗。
  • 如果queue是非持久化queue,則如果創建queue的那個節點失敗,發送方和接收方可以創建同樣的queue繼續運作。但如果是持久化queue,則只能等創建queue的那個節點恢復后才能繼續服務。
  • 在集群元數據有變動的時候需要有disk node在線,但是在節點加入或退出的時候所有的disk node必須全部在線。如果沒有正確退出disk node,集群會認為這個節點當掉了,在這個節點恢復之前不要加入其它節點。.

?

4?? RABBITMQ HA

1?? 鏡像隊列概念

鏡像隊列可以同步queue和message,當主queue掛掉,從queue中會有一個變為主queue來接替工作。

鏡像隊列是基于普通的集群模式的,所以你還是得先配置普通集群,然后才能設置鏡像隊列。

鏡像隊列設置后,會分一個主節點和多個從節點,如果主節點宕機,從節點會有一個選為主節點,原先的主節點起來后會變為從節點。

queue和message雖然會存在所有鏡像隊列中,但客戶端讀取時不論物理面連接的主節點還是從節點,都是從主節點讀取數據,然后主節點再將queue和message的狀態同步給從節點,因此多個客戶端連接不同的鏡像隊列不會產生同一message被多次接受的情況。

2?? 配置鏡像隊列

沿用3.2的環境,現在我們把名為“hello”的隊列設置為同步給所有節點

#rabbitmqctl set_policy ?ha-all “hello” ‘{“ha-mode”:”all”}’

ha-all 是同步模式,指同步給所有節點,還有另外兩種模式ha-exactly表示在指定個數的節點上進行鏡像,節點的個數由ha-params指定,ha-nodes表示在指定的節點上進行鏡像,節點名稱通過ha-params指定;

hello 是同步的隊列名,可以用正則表達式匹配;

{“ha-mode”:”all”} 表示同步給所有,同步模式的不同,此參數也不同。

執行上面命令后,可以在web管理界面查看queue 頁面,里面hello隊列的node節點后會出現+2標簽,表示有2個從節點,而主節點則是當前顯示的node(xf7021是測試用的名字,按4-2應該為rabbitmq(1-3))。

?

5?? keepalived和執行腳本

1?? A-keepalived配置

紅字為手工加的備注,原文件里沒有。

vi /etc/keepalived/keepalived.cnf文件

?

global_defs {

router_id LVS_MASTER?? }

?

vrrp_instance VI_1 {

state MASTER

interface eth0

virtual_router_id 51

priority 100

advert_int 1

authentication {

auth_type PASS

auth_pass 1111

}

virtual_ipaddress {

192.168.10.251/24????? #rabbitmq

}

}

?

virtual_server 192.168.10.251 5672 {

delay_loop 6

lb_algo rr

lb_kind DR

protocol TCP

?

real_server 192.168.10.3 5672 {

weight 3

TCP_CHECK {

connect_timeout 3

nb_get_retry 3

delay_before_retry 3

connect_port 5672

}

}

?

real_server 192.168.10.4 5672 {

weight 3

TCP_CHECK {

connect_timeout 3

nb_get_retry 3

delay_before_retry 3

connect_port 5672

}

}

}

?

2?? rabbitmq 服務器上執行的腳本

lvs_rabbitmq.sh腳本內容:

#!/bin/bash

VIP=192.168.10.251

case “$1” in

start)

ifconfig lo:0 $VIP netmask 255.255.255.255 broadcast $VIP

/sbin/route add -host $VIP dev lo:0

echo “1” >/proc/sys/net/ipv4/conf/lo/arp_ignore

echo “2” >/proc/sys/net/ipv4/conf/lo/arp_announce

echo “1” >/proc/sys/net/ipv4/conf/all/arp_ignore

echo “2” >/proc/sys/net/ipv4/conf/all/arp_announce

sysctl -p >/dev/null 2>&1

echo “lvs_vip server start ok!”;;

stop)

ifconfig lo:0 down

/sbin/route del $VIP >/dev/null 2>&1

?

echo “0” >/proc/sys/net/ipv4/conf/lo/arp_ignore

echo “0” >/proc/sys/net/ipv4/conf/lo/arp_announce

echo “0” >/proc/sys/net/ipv4/conf/all/arp_ignore

echo “0” >/proc/sys/net/ipv4/conf/all/arp_announce

?

echo “lvs_vip server stoped.”;;

?

*)

echo “arg start|stop.”

exit 1

esac

?

exit 0

?

?

6?? RABBITMQ監控

RabbitMQ監控項目很多,可通過web管理界面監控。

?

OVERVIEW頁面下有4個標簽。主要關注totals和nodes兩個。

1?? totals

?

ready為待處理消息量,total為總消息量。

publish為每秒發送消息量,deliver為每秒接受消息量。

下面5個灰色長方塊分別代表對應的模塊連接數。

?

2?? nodes

?

name為節點名稱,后面5個藍色方塊分別代表文件打開數,socket連接數,erlang processes(暫時未知),內存占用兩,磁盤空余量;info里顯示節點屬性,將鼠標放在內容上會顯示對應的統計內容。

7?? 系統測試

3?? 測試環境

主機名 ?????? ?IP

VIP:????? 192.168.10.251

client???? 192.168.10.2? ??–本測試發送(producer)和接收(consumer)在同一臺機器

rabbitmq1 192.168.10.3

rabbitmq2? 192.168.10.4

?

4?? 準備工作

負載機啟動keepalived

# service keepalived start

?

?

rabbitmq1和rabbitmq2執行5-2的腳本

#./lvs_rabbitmq.sh start

?

按第3和第4章的方法組建集群,配置鏡像隊列,節點類型最好都設置為磁盤節點。

按第1-5創建用戶。

?

5?? 客戶機測試腳本

測試用RabbitMQ的python語言客戶端,注意python是靠縮進量來區分語句塊。紅色部分為注釋,源碼上沒有。

?

發送源碼:

#vi send.py

?

#!/usr/bin/env python

import pika

import time

credentials=pika.PlainCredentials(‘iom’,’123456′)??? –配置連接的用戶名和密碼

parameters=pika.ConnectionParameters(‘192.168.10.251′,5672,’/’,credentials)

connection=pika.BlockingConnection(parameters)

channel=connection.channel()

channel.queue_declare(queue=’hello’)

?

?

count=0

while count<9999:

message=’Hello World’+str(count)

count=count+1

channel.basic_publish(exchange=”,routing_key=’hello’,body=message)

print “Sent %s” %(message)

time.sleep(1)

connection.close()

?

接收源碼

#vi receive.py

#!/usr/bin/env python

import pika

connection=pika.BlockingConnection(pika.ConnectionParameters(host=’xf7027′))

channel=connection.channel()

channel.queue_declare(queue=’hello’)

print ‘[*] Waiting for message.To exit press CTRL+C’

def callback(ch,method,properties,body):

print “[x] Received %r” %(body,)

?

channel.basic_consume(callback,queue=’hello’,no_ack=True)

channel.start_consuming()

6?? 測試過程

測試keepalived分配

在客戶機上執行

#python send.py

?

在負載機上執行

#watch ipvsadm –Ln

可以看到rabbitmq1或rabbitmq2的activeconn列數值為1。

?

客戶機重新執行發送程序

#python send.py

在負載機上可以看到另一個rabbitmq服務的activeconn 列數值也變為1。

?

?

?

測試容災性:

?

在客戶機上分別執行發送和接受程序。

#python send.py

#python receive.py

然后關掉一個rabbitmq節點,如果關掉的正好是客戶機連的那個節點的話,客戶機發送和接收程序會報錯退出(程序本身如果有錯誤重發機制則不受任何影響)。如果關掉的是另外的節點,程序不受任何影響。

轉載于:https://my.oschina.net/jiaoyanli/blog/821762

總結

以上是生活随笔為你收集整理的RabbitMQ集群、镜像部署配置的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。