日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪(fǎng)問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > php >内容正文

php

RabbitMQ 入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)

發(fā)布時(shí)間:2023/12/9 php 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ 入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

發(fā)布/訂閱

在上篇第二部分教程中,我們搭建了一個(gè)工作隊(duì)列。每個(gè)任務(wù)之分發(fā)給一個(gè)工作者(worker)。在本篇教程中,我們要做的之前完全不一樣——分發(fā)一個(gè)消息給多個(gè)消費(fèi)者(consumers)。這種模式被稱(chēng)為“發(fā)布/訂閱”。

為了描述這種模式,我們將會(huì)構(gòu)建一個(gè)簡(jiǎn)單的日志系統(tǒng)。它包括兩個(gè)程序——第一個(gè)程序負(fù)責(zé)發(fā)送日志消息,第二個(gè)程序負(fù)責(zé)獲取消息并輸出內(nèi)容。

在我們的這個(gè)日志系統(tǒng)中,所有正在運(yùn)行的接收方程序都會(huì)接受消息。我們用其中一個(gè)接收者(receiver)把日志寫(xiě)入硬盤(pán)中,另外一個(gè)接受者(receiver)把日志輸出到屏幕上。

最終,日志消息被廣播給所有的接受者(receivers)。

交換器(Exchanges)

前面的教程,我們發(fā)送消息到隊(duì)列并從中取出消息?,F(xiàn)在是時(shí)候介紹RabbitMq中完整的消息模型了。

讓我們簡(jiǎn)單的概括一下之前的教程:

  • 發(fā)布者(producer)是發(fā)布消息的應(yīng)用程序。
  • 隊(duì)列(queue)用于消息存儲(chǔ)的緩沖。
  • 消費(fèi)者(consumer)是接收消息的應(yīng)用程序。

RabbitMQ消息模型的核心理念是:發(fā)布者(producer)不會(huì)直接發(fā)送任何消息給隊(duì)列。事實(shí)上,發(fā)布者(producer)甚至不知道消息是否已經(jīng)被投遞到隊(duì)列。

發(fā)布者(producer)只需要把消息發(fā)送給一個(gè)交換器(exchange)。交換器非常簡(jiǎn)單,它一邊從發(fā)布者方接收消息,一邊把消息推入隊(duì)列。交換器必須知道如何處理它接收到的消息,是應(yīng)該推送到指定的隊(duì)列還是是多個(gè)隊(duì)列,或者是直接忽略消息。這些規(guī)則是通過(guò)exchange type來(lái)定義的。

有幾個(gè)可供選擇的交換器類(lèi)型:AMQPEXTYPEDIRECT,AMQPEXTYPEFANOUT,AMQPEXTYPEHEADER?orAMQPEXTYPETOPIC。我們?cè)谶@里主要說(shuō)明AMQPEXTYPE_FANOUT。先創(chuàng)建一個(gè)fanout類(lèi)型的交換器,命名為logs:

$exchange->setName('logs'); $exchange->setType(AMQP_EX_TYPE_FANOUT); $exchange->declare();

fanout交換器很簡(jiǎn)單,你可能從名字上就能猜測(cè)出來(lái),它把消息發(fā)送給它所知道的所有隊(duì)列。這正是我們的日志系統(tǒng)所需要的。

交換器列表

rabbitmqctl能夠列出服務(wù)器上所有的交換器:

$ sudo rabbitmqctl list_exchanges Listing exchanges ... logs fanout amq.direct direct amq.topic topic amq.fanout fanout amq.headers headers ...done.

這個(gè)列表中有一些叫做amq.*的交換器。這些都是默認(rèn)創(chuàng)建的,不過(guò)這時(shí)候你還不需要使用他們。

匿名的交換器

前面的教程中我們對(duì)交換器一無(wú)所知,但仍然能夠發(fā)送消息到隊(duì)列中。因?yàn)槲覀兪褂昧嗣麨榭兆址?“”)默認(rèn)的交換器。 回想我們之前是如何發(fā)布一則消息:

``` $exchange->publish($message, $routeKey);

```

exchange參數(shù)就是交換器的名稱(chēng)??兆址砟J(rèn)或者匿名交換器:消息將會(huì)根據(jù)指定的routing_key分發(fā)到指定的隊(duì)列。

在PHP的AMQP中如果exchange設(shè)置為匿名的話(huà),是報(bào)錯(cuò)的:PHP Fatal error: Uncaught exception ‘AMQPExchangeException’ with message ‘Invalid exchange name given, must be between 1 and 255 characters long.’

現(xiàn)在,我們就可以發(fā)送消息到一個(gè)具名交換器了:

$exchange->publish($message, '');

臨時(shí)隊(duì)列

你還記得之前我們使用的隊(duì)列名嗎( hello和task_queue)?給一個(gè)隊(duì)列命名是很重要的——我們需要把工作者(workers)指向正確的隊(duì)列。如果你打算在發(fā)布者(producers)和消費(fèi)者(consumers)之間共享同隊(duì)列的話(huà),給隊(duì)列命名是十分重要的。

但是這并不適用于我們的日志系統(tǒng)。我們打算接收所有的日志消息,而不僅僅是一小部分。我們關(guān)心的是最新的消息而不是舊的。為了解決這個(gè)問(wèn)題,我們需要做兩件事情。

首先,當(dāng)我們連接上RabbitMQ的時(shí)候,我們需要一個(gè)全新的、空的隊(duì)列。我們可以手動(dòng)創(chuàng)建一個(gè)隨機(jī)的隊(duì)列名,或者讓服務(wù)器為我們選擇一個(gè)隨機(jī)的隊(duì)列名(推薦)。我們只要在調(diào)用$queue->declare();方法的時(shí)候,不提供queue參數(shù)就可以了:

$queue = new AMQPQueue($channel); $queue->setFlags(AMQP_EXCLUSIVE); $queue->declare();

這時(shí)候我們可以通過(guò)$queue->getName();獲得已經(jīng)生成的隨機(jī)隊(duì)列名。它可能是這樣子的:amq.gen-U0srCoW8TsaXjNh73pnVAw==。

第二步,當(dāng)與消費(fèi)者(consumer)斷開(kāi)連接的時(shí)候,這個(gè)隊(duì)列應(yīng)當(dāng)被刪除。我們可以使用exclusive標(biāo)識(shí)。

$queue->setFlags(AMQP_EXCLUSIVE);

綁定(Bindings)

我們已經(jīng)創(chuàng)建了一個(gè)fanout類(lèi)型的交換器和一個(gè)隊(duì)列?,F(xiàn)在我們需要告訴交換器如何發(fā)送消息給我們的隊(duì)列。交換器和隊(duì)列之間的關(guān)系我們稱(chēng)之為綁定(binding)。

$queue->bind($exchangeName, $queue->getName());

現(xiàn)在,logs交換器將會(huì)把消息添加到我們的隊(duì)列中。

綁定列表。

你可以使用rabbitmqctl list_bindings隊(duì)列出所有存在的綁定。.

整合代碼

發(fā)布日志消息的程序看起來(lái)和之前的沒(méi)有太大區(qū)別。最重要的改變就是我們把消息發(fā)送給logs交換器而不是匿名交換器。在發(fā)送的時(shí)候我們需要提供routingkey參數(shù),但是它的值會(huì)被fanout交換器忽略。

emit_log.php

<?php$exchangeName = 'logs'; $message = empty($argv[1]) ? 'info:Hello World!' : ' '.$argv[1];$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n");$channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_FANOUT); //$exchange->declare(); $exchange->declareExchange();$exchange->publish($message, ''); var_dump("[x] Sent $message");$connection->disconnect();

正如你看到的那樣,在連接成功之后,我們聲明了一個(gè)交換器,這一個(gè)是很重要的,因?yàn)椴辉试S發(fā)布消息到不存在的交換器。

如果沒(méi)有綁定隊(duì)列到交換器,消息將會(huì)丟失。但這個(gè)沒(méi)有所謂,如果沒(méi)有消費(fèi)者監(jiān)聽(tīng),那么消息就會(huì)被忽略。

receive_logs.php

<?php$exchangeName = 'logs';$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_FANOUT); //$exchange->declare(); $exchange->declareExchange(); $queue = new AMQPQueue($channel); $queue->setFlags(AMQP_EXCLUSIVE); //$exchange->declare(); $queue->declareQueue(); $queue->bind($exchangeName, '');var_dump('[*] Waiting for messages. To exit press CTRL+C'); while (TRUE) {$queue->consume('callback'); } $connection->disconnect();function callback($envelope, $queue) {$msg = $envelope->getBody();var_dump(" [x] Received:" . $msg);$queue->nack($envelope->getDeliveryTag()); }

這樣我們就完成了。如果你想把日志保存到文件中,只需要打開(kāi)控制臺(tái)輸入:

php receive_logs.php > logs_from_rabbit.log

如果你想在屏幕中查看日志,那么打開(kāi)一個(gè)新的終端然后運(yùn)行:

php receive_logs.php

當(dāng)然還要發(fā)送日志:

php emit_log.php

?

使用方法,需要打開(kāi)三個(gè)終端,兩個(gè)用來(lái)運(yùn)行receive_logs.php腳本【先啟動(dòng)】,一個(gè)用來(lái)運(yùn)行emit_log.php腳本【后啟動(dòng)】,結(jié)果為兩個(gè)receive_logs.php腳本會(huì)同時(shí)受到消息

運(yùn)行效果:

?

轉(zhuǎn)載于:https://www.cnblogs.com/-mrl/p/11102740.html

總結(jié)

以上是生活随笔為你收集整理的RabbitMQ 入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。