RabbitMQ(二)工作队列
工作隊列(又稱任務隊列)的主要思想是避免立即執(zhí)行資源密集型任務,而不得不等待它完成。相反我們安排任務在之后執(zhí)行。我們把任務封裝為消息并將其發(fā)送到隊列。在后臺運行的工作進程將彈出任務并最終執(zhí)行作業(yè)。當有多個工作線程時,這些工作線程將一起處理這些任務。
?
3.1 輪詢分發(fā)消息
在這個案例中我們會啟動兩個工作線程,一個消息發(fā)送線程,我們來看看它們兩個工作線程是如何工作的。
3.1.1 抽取工具類
/*** 連接工廠創(chuàng)建信道工具類*/ public class RabbitMqUtils {//得到一個連接的 channelpublic static Channel getChannel() throws Exception{//創(chuàng)建一個連接工廠ConnectionFactory factory = new ConnectionFactory();factory.setHost("xxx.xxx.xxx.xxx");factory.setUsername("admin");factory.setPassword("123");Connection connection = factory.newConnection();Channel channel = connection.createChannel();return channel;} }3.1.2 啟動兩個工作線程
public class Worker01 {private static final String QUEUE_NAME = "hello"; ?public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();DeliverCallback deliverCallback = (consumerTag, delivery) -> {String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:" + receivedMessage);};CancelCallback cancelCallback = (consumerTag) -> {System.out.println(consumerTag + "消費者取消消費接口回調(diào)邏輯");};System.out.println("C2 消費者啟動等待消費......");channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);} }3.1.3 啟動一個發(fā)送線程
public class Task01 {private static final String QUEUE_NAME = "hello"; ?public static void main(String[] args) throws Exception {try (Channel channel = RabbitMqUtils.getChannel();) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);//從控制臺當中接受信息Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.next();channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("發(fā)送消息完成:" + message);}}} ? }3.1.4 結果展示
通過程序執(zhí)行發(fā)現(xiàn)生產(chǎn)者總共發(fā)送 4 個消息,消費者 1 和消費者 2 分別分得兩個消息,并且是按照有序的一個接收一次消息。
?
3.2 消費應答
3.2.1 概念
消費者完成一個任務可能需要一段時間,如果其中一個消費者處理一個長的任務并僅只完成 了部分突然它掛掉了,會發(fā)生什么情況。RabbitMQ 一旦向消費者傳遞了一條消息,便立即將該消息標記為刪除。在這種情況下,突然有個消費者掛掉了,我們將丟失正在處理的消息。以及后續(xù)發(fā)送給該消費這的消息,因為它無法接收到。
為了保證消息在發(fā)送過程中不丟失,rabbitmq 引入消息應答機制,消息應答就是:消費者在接收到消息并且處理該消息之后,告訴 rabbitmq 它已經(jīng)處理了,rabbitmq 可以把該消息刪除了。
3.2.2 自動應答
消息發(fā)送后立即被認為已經(jīng)傳送成功,這種模式需要在高吞吐量和數(shù)據(jù)傳輸安全性方面做權衡,因為這種模式如果消息在接收到之前,消費者那邊出現(xiàn)連接或者 channel 關閉,那么消息就丟失了,當然另一方面這種模式消費者那邊可以傳遞過載的消息,沒有對傳遞的消息數(shù)量進行限制, 當然這樣有可能使得消費者這邊由于接收太多還來不及處理的消息,導致這些消息的積壓,最終使得內(nèi)存耗盡,最終這些消費者線程被操作系統(tǒng)殺死,所以這種模式僅適用在消費者可以高效并以某種速率能夠處理這些消息的情況下使用。
3.2.3 手動消息應答的方式
| Channel.basicAck(用于肯定確認) | RabbitMQ 已知道該消息并且成功的處理消息,可以將其丟棄了 |
| Channel.basicNack(用于否定確認) | 拒絕消息,可以同時支持多個消息 |
| Channel.basicReject(用于否定確認) | 與 Channel.basicNack 相比少一個參數(shù) 不處理該消息了直接拒絕,可以將其丟棄了 |
-
channel.basicReject(deliveryTag, true);
-
basic.reject方法拒絕deliveryTag對應的消息,第二個參數(shù)是否requeue,true則重新入隊列,否則丟棄或者進入死信隊列。
-
該方法reject后,該消費者還是會消費到該條被reject的消息。
-
-
channel.basicNack(deliveryTag, false, true);
-
basic.nack方法為不確認deliveryTag對應的消息,第二個參數(shù)是否應用于多消息,第三個參數(shù)是否requeue,與basic.reject區(qū)別就是同時支持多個消息,可以nack該消費者先前接收未ack的所有消息。nack后的消息也會被自己消費到。
-
-
channel.basicRecover(true);
-
basic.recover是否恢復消息到隊列,參數(shù)是是否requeue,true則重新入隊列,并且盡可能的將之前recover的消息投遞給其他消費者消費,而不是自己再次消費。false則消息會重新被投遞給自己
-
3.2.4 Multiple 的解釋
手動應答的好處是可以批量應答并且減少網(wǎng)絡擁堵。
?
multiple 的 true 和 false 代表不同意思
-
true 代表批量應答 channel 上未應答的消息,比如說 channel 上有傳送 tag 的消息 5,6,7,8 當前 tag 是 8 那么此時5-8 的這些還未應答的消息都會被確認收到消息應答
-
false 同上面相比只會應答 tag=8 的消息 5,6,7 這三個消息依然不會被確認收到消息應答
?
?
3.2.5 消息重新入隊
如果消費者由于某些原因失去連接(其通道已關閉,連接已關閉或 TCP 連接丟失),導致消息 未發(fā)送 ACK 確認,RabbitMQ 將了解到消息未完全處理,并將對其重新排隊。如果此時其他消費者可以處理,它將很快將其重新分發(fā)給另一個消費者。這樣,即使某個消費者偶爾死亡,也可以確保不會丟失任何消息。
?
3.2.6 消息手動應答代碼
默認消息采用的是自動應答,所以我們要想實現(xiàn)消息消費過程中不丟失,需要把自動應答改 為手動應答,消費者在上面代碼的基礎上增加下面代碼。
?
3.2.7 手動應答效果演示
正常情況下消息發(fā)送方發(fā)送兩個消息 C1 和 C2 分別接收到消息并進行處理
在發(fā)送者發(fā)送消息 dd,發(fā)出消息之后的把 C2 消費者停掉,按理說該 C2 來處理該消息,但是由于它處理時間較長,在還未處理完,也就是說 C2 還沒有執(zhí)行 ack 代碼的時候,C2 被停掉了, 此時會看到消息被 C1 接收到了,說明消息 dd 被重新入隊,然后分配給能處理消息的 C1 處理了。
3.3 持久化
3.3.1 概念
剛剛我們已經(jīng)看到了如何處理任務不丟失的情況,但是如何保障當 RabbitMQ 服務停掉以后消息生產(chǎn)者發(fā)送過來的消息不丟失。默認情況下 RabbitMQ 退出或由于某種原因崩潰時,它忽視隊列和消息,除非告知它不要這樣做。確保消息不會丟失需要做兩件事:我們需要將隊列和消息都標記為持久化。
3.3.2 如何實現(xiàn)持久化
之前我們創(chuàng)建的隊列都是非持久化的,rabbitmq 如果重啟的話,該隊列就會被刪除掉,如果 要隊列實現(xiàn)持久化需要在聲明隊列的時候把 durable 參數(shù)設置為持久化。
channel.queueDeclare(QUEUE_NAME, true, false, false, null);但是需要注意的就是如果之前聲明的隊列不是持久化的,需要把原先隊列先刪除,或者重新創(chuàng)建一個持久化的隊列,不然就會出現(xiàn)錯誤。
?
3.3.3 消息持久化
要想讓消息實現(xiàn)持久化需要在消息生產(chǎn)者修改代碼,MessageProperties.PERSISTENT_TEXT_PLAIN 添加這個屬性。
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());將消息標記為持久化并不能完全保證不會丟失消息。盡管它告訴 RabbitMQ 將消息保存到磁盤,但是這里依然存在當消息剛準備存儲在磁盤的時候但是還沒有存儲完,消息還在緩存的一個間隔點。此時并沒有真正寫入磁盤。持久性保證并不強,但是對于我們的簡單任務隊列而言,這已經(jīng)綽綽有余了。
3.3.4 不公平分發(fā)
在最開始的時候我們學習到 RabbitMQ 分發(fā)消息采用的輪訓分發(fā),但是在某種場景下這種策略并不是很好,比方說有兩個消費者在處理任務,其中有個消費者 1 處理任務的速度非???#xff0c;而另外一個消費者 2 處理速度卻很慢,這個時候我們還是采用輪訓分發(fā)的化就會到這處理速度快的這個消費者很大一部分時間 處于空閑狀態(tài),而處理慢的那個消費者一直在干活,這種分配方式在這種情況下其實就不太好,但是 RabbitMQ 并不知道這種情況它依然很公平的進行分發(fā)。
為了避免這種情況,我們可以設置參數(shù) channel.basicQos(1)
意思就是如果這個任務我還沒有處理完或者我還沒有應答你,你先別分配給我,我目前只能處理一個任務,然后 rabbitmq 就會把該任務分配給沒有那么忙的那個空閑消費者,當然如果所有的消費者都沒有完成手上任務,隊列還在不停的添加新任務,隊列有可能就會遇到隊列被撐滿的情況,這個時候就只能添加新的 worker 或者改變其他存儲任務的策略。
3.3.5 預期值
本身消息的發(fā)送就是異步發(fā)送的,所以在任何時候,channel 上肯定不止只有一個消息另外來自消費者的手動確認本質(zhì)上也是異步的。因此這里就存在一個未確認的消息緩沖區(qū),因此希望開發(fā)人員能限制此緩沖區(qū)的大小,以避免緩沖區(qū)里面無限制的未確認消息問題。這個時候就可以通過使用 basic.qos 方法設置“預取計數(shù)”值來完成的。該值定義通道上允許的未確認消息的最大數(shù)量。一旦數(shù)量達到配置的數(shù)量, RabbitMQ 將停止在通道上傳遞更多消息,除非至少有一個未處理的消息被確認,例如,假設在通道上有未確認的消息 5、6、7,8,并且通道的預取計數(shù)設置為 4,此時 RabbitMQ 將不會在該通道上再傳遞任何消息,除非至少有一個未應答的消息被 ack。比方說 tag=6 這個消息剛剛被確認 ACK,RabbitMQ 將會感知這個情況到并再發(fā)送一條消息。消息應答和 QoS 預取值對用戶吞吐量有重大影響。通常,增加預取將提高向消費者傳遞消息的速度。雖然自動應答傳輸消息速率是最佳的,但是,在這種情況下已傳遞但尚未處理的消息的數(shù)量也會增加,從而增加了消費者的 RAM 消耗(隨機存取存儲器)應該小心使用具有無限預處理的自動確認模式或手動確認模式,消費者消費了大量的消息如果沒有確認的話,會導致消費者連接節(jié)點的內(nèi)存消耗變大,所以找到合適的預取值是一個反復試驗的過程,不同的負載該值取值也不同 100 到 300 范 圍內(nèi)的值通??商峁┳罴训耐掏铝?#xff0c;并且不會給消費者帶來太大的風險。預取值為 1 是最保守的。當然這將使吞吐量變得很低,特別是消費者連接延遲很嚴重的情況下,特別是在消費者連接等待時間較長的環(huán)境中。對于大多數(shù)應用來說,稍微高一點的值將是最佳的。
?
總結
以上是生活随笔為你收集整理的RabbitMQ(二)工作队列的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RabbitMQ(一)helloworl
- 下一篇: RabbitMQ(三)发布确认