日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > php >内容正文

php

work php高性能,RabbitMQ之工作(Work)模式(PHP版)-Go语言中文社区

發(fā)布時間:2024/4/11 php 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 work php高性能,RabbitMQ之工作(Work)模式(PHP版)-Go语言中文社区 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

工作模式(Work)

生產者(P)負責生產消息,將消息發(fā)送到隊列(queue)中,多個消費者(C)監(jiān)聽隊列,隊列有消息就進行消費;工作模式就是一個生產者對應多個消費者。

類庫

RabbitMQ使用的是AMQP協議。要使用她你就必須需要一個使用同樣協議的庫。這里使用php-amqplib,并且使用Composer依賴管理。

# 項目中添加一個composer.json文件

{

"require": {

"php-amqplib/php-amqplib": "^2.11"

}

}

# 使用Composer安裝(前提要安裝Composer)

composer install

生產者(消息發(fā)送方)

生產者連接到RabbitMQ Broker,將消息發(fā)送給Broker,Broker將消息存入對應隊列。 查看生產者代碼(publish.php)。

require_once __DIR__ . '/../vendor/autoload.php';

require_once __DIR__ . '/../conf/config.php';

use PhpAmqpLibConnectionAMQPStreamConnection;

use PhpAmqpLibMessageAMQPMessage;

use PhpAmqpLibExchangeAMQPExchangeType;

$exchange = 'work'; // 交換器名稱

$queue = 'work'; // 隊列名稱

try {

$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST); // 建立連接到RabbitMQ服務器

$channel = $connection->channel(); // 建立通道

$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, false, false); // 試探性聲明一個交換機

$channel->queue_declare($queue, false, false, false, false); // 試探性聲明一個隊列

$channel->queue_bind($queue, $exchange); // 隊列綁定交換器

// 生產10條消息

for ($i = 0; $i < 10; $i++) {

$body = "工作模式下生成第【" . ($i + 1) . "】條消息";

$message = new AMQPMessage($body, ['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

$channel->basic_publish($message, $exchange);

echo ' [x] Sent ' . $body . "n";

sleep(1);

}

$channel->close(); // 關閉通道

$connection->close(); // 關閉連接

} catch (Exception $e) {

die($e->getMessage());

}

消費者(消息的接收方)

消費者連接到RabbitMQ Broker,消費者向RabbitMQ Broker請求響應隊列消息,RabbitMQ Broker回應并投遞相應隊列消息,消費者接收消息。查看消費者代碼(receive.php)。

require_once __DIR__ . '/../vendor/autoload.php';

require_once __DIR__ . '/../conf/config.php';

use PhpAmqpLibConnectionAMQPStreamConnection;

$queue = 'work'; // 隊列名稱

try {

$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST); // 建立連接到RabbitMQ服務器

$channel = $connection->channel(); // 建立通道

$channel->queue_declare($queue, false, false, false, false); // 試探性聲明一個隊列

echo " [*] Waiting for messages. To exit press CTRL+Cn";

$callback = function ($msg) { // 回調函數

sleep(3);

// 手動確認消息是否正常消費,保證消息消費的冪等。

$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

echo ' [x] Received ', $msg->body, "n";

};

// basic_qos方法設置參數prefetch_count = 1。這告訴RabbitMQ不要在一個時間給一個消費者多個消息(在處理和確認以前的消息之前,不要向消費者發(fā)送新消息。相反,它將發(fā)送給下一個仍然不忙的消費者)。

$channel->basic_qos(null, 1, null);

// basic_consume方法設置參數no_ack=false。告訴RabbitMQ消費消息需要手動確認。

$channel->basic_consume($queue, '', false, false, false, false, $callback);

while ($channel->is_consuming()) { // 循環(huán)獲取消息

$channel->wait();

}

$channel->close();

$connection->close();

} catch (Exception $e) {

die($e->getMessage());

}

測試

開啟兩個消費者,一個生產者。

查看結果。

其他

總結

以上是生活随笔為你收集整理的work php高性能,RabbitMQ之工作(Work)模式(PHP版)-Go语言中文社区的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。