日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > php >内容正文

php

rabbitmq队列php应用,RabbitMQ工作队列应用

發(fā)布時(shí)間:2024/10/8 php 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rabbitmq队列php应用,RabbitMQ工作队列应用 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

工作隊(duì)列是說由一個(gè)生產(chǎn)者發(fā)送消息給隊(duì)列,而可能有一個(gè)或多個(gè)消費(fèi)者等待從隊(duì)列接收并處理消息

循環(huán)調(diào)度

當(dāng)有多個(gè)消費(fèi)者時(shí),隊(duì)列循環(huán)發(fā)送消息給每一個(gè)消費(fèi)者,默認(rèn)情況下,rabbitmq會(huì)按順序發(fā)送一條消息給下一個(gè)消費(fèi)者,平均每個(gè)消費(fèi)者會(huì)收到相同數(shù)量的消息

消息確認(rèn)

消息確認(rèn)是防止消息丟失,一個(gè)確認(rèn)信息會(huì)從消費(fèi)者返回,告訴rabbitmq該消費(fèi)已經(jīng)收到處理,這時(shí)rabbitmq才會(huì)釋放刪除消息。默認(rèn)情況下消息確認(rèn)是被關(guān)閉的,設(shè)置basic_consume()的第四個(gè)參數(shù)為false(true means no ack),且從消費(fèi)者發(fā)送一個(gè)確認(rèn)即可。

消息持久化

要確保消息不丟失我們需要標(biāo)識(shí)隊(duì)列和消息都是可持久的,首先要確保rabbitmq不會(huì)丟失隊(duì)列,我們需要聲明隊(duì)列為可持久的,這里設(shè)置queue_declare的第三個(gè)參數(shù)為true即可

$channel->queue_declare('task_queue', false, true, false, false);

其次我們需要通過delivery_mode = 2參數(shù)標(biāo)記消息是持久化的,如

$msg = new AMQPMessage($data,

array('delivery_mode' => 2) # make message persistent

);

公平調(diào)度

循環(huán)調(diào)度容易造成奇偶消息費(fèi)者閑忙不等的情況,為了公平調(diào)度我們可以使用basic_qos方法,同時(shí)設(shè)置參數(shù)prefetch=1,如

$channel->basic_qos(null, 1, null);

生產(chǎn)者new_task.php

require_once __DIR__ . '/../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

//第3個(gè)參數(shù)為true,使消息隊(duì)列持久化

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));

if(empty($data)) $data = "Hello World!";

$msg = new AMQPMessage($data,

array('delivery_mode' => 2) # 使消息持久化

);

$channel->basic_publish($msg, '', 'task_queue');

echo " [x] Sent ", $data, "\n";

$channel->close();

$connection->close();

消費(fèi)者worker.php

require_once __DIR__ . '/../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

//第3個(gè)參數(shù)為true,使消息隊(duì)列持久化

$channel->queue_declare('task_queue', false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg) {

echo " [x] Received ", $msg->body, "\n";

sleep(substr_count($msg->body, '.'));

echo " [x] Done", "\n";

//發(fā)送回復(fù)確認(rèn)

$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

};

//合理分發(fā)消息,即等消費(fèi)者回復(fù)確認(rèn)之后不忙的時(shí)候再發(fā)送消息

$channel->basic_qos(null,1,null);

//第4個(gè)參數(shù)為false表示等待消費(fèi)者回復(fù)再刪除消息

$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {

$channel->wait();

}

$channel->close();

$connection->close();

總結(jié)

以上是生活随笔為你收集整理的rabbitmq队列php应用,RabbitMQ工作队列应用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。