rabbitmq队列php应用,RabbitMQ工作队列应用
工作隊(duì)列是說由一個(gè)生產(chǎn)者發(fā)送消息給隊(duì)列,而可能有一個(gè)或多個(gè)消費(fèi)者等待從隊(duì)列接收并處理消息
循環(huán)調(diào)度
當(dāng)有多個(gè)消費(fèi)者時(shí),隊(duì)列循環(huán)發(fā)送消息給每一個(gè)消費(fèi)者,默認(rèn)情況下,rabbitmq會(huì)按順序發(fā)送一條消息給下一個(gè)消費(fèi)者,平均每個(gè)消費(fèi)者會(huì)收到相同數(shù)量的消息
消息確認(rèn)
消息確認(rèn)是防止消息丟失,一個(gè)確認(rèn)信息會(huì)從消費(fèi)者返回,告訴rabbitmq該消費(fèi)已經(jīng)收到處理,這時(shí)rabbitmq才會(huì)釋放刪除消息。默認(rèn)情況下消息確認(rèn)是被關(guān)閉的,設(shè)置basic_consume()的第四個(gè)參數(shù)為false(true means no ack),且從消費(fèi)者發(fā)送一個(gè)確認(rèn)即可。
消息持久化
要確保消息不丟失我們需要標(biāo)識(shí)隊(duì)列和消息都是可持久的,首先要確保rabbitmq不會(huì)丟失隊(duì)列,我們需要聲明隊(duì)列為可持久的,這里設(shè)置queue_declare的第三個(gè)參數(shù)為true即可
$channel->queue_declare('task_queue', false, true, false, false);
其次我們需要通過delivery_mode = 2參數(shù)標(biāo)記消息是持久化的,如
$msg = new AMQPMessage($data,
array('delivery_mode' => 2) # make message persistent
);
公平調(diào)度
循環(huán)調(diào)度容易造成奇偶消息費(fèi)者閑忙不等的情況,為了公平調(diào)度我們可以使用basic_qos方法,同時(shí)設(shè)置參數(shù)prefetch=1,如
$channel->basic_qos(null, 1, null);
生產(chǎn)者new_task.php
require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
//第3個(gè)參數(shù)為true,使消息隊(duì)列持久化
$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
array('delivery_mode' => 2) # 使消息持久化
);
$channel->basic_publish($msg, '', 'task_queue');
echo " [x] Sent ", $data, "\n";
$channel->close();
$connection->close();
消費(fèi)者worker.php
require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
//第3個(gè)參數(shù)為true,使消息隊(duì)列持久化
$channel->queue_declare('task_queue', false, true, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg) {
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
//發(fā)送回復(fù)確認(rèn)
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
//合理分發(fā)消息,即等消費(fèi)者回復(fù)確認(rèn)之后不忙的時(shí)候再發(fā)送消息
$channel->basic_qos(null,1,null);
//第4個(gè)參數(shù)為false表示等待消費(fèi)者回復(fù)再刪除消息
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
總結(jié)
以上是生活随笔為你收集整理的rabbitmq队列php应用,RabbitMQ工作队列应用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: matlab摄像头录像保存在哪里,mat
- 下一篇: 动态规划算法php,php算法学习之动态