日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > php >内容正文

php

RabbitMQ+PHP 教程六(RPC)

發布時間:2023/12/6 php 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ+PHP 教程六(RPC) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

(using php-amqplib)

前提必讀

本教程假設RabbitMQ是安裝在標準端口上運行(5672)。如果您使用不同的主機、端口或憑據,則連接設置需要調整。

如果您在本教程中遇到困難,可以通過郵件列表與我們聯系。

開始

在第二個教程中,我們學習了如何使用工作隊列在多個工人之間分配耗時的任務。

但是如果我們需要在遠程計算機上運行一個函數并等待結果呢?嗯,那是另一回事了。這種模式通常稱為遠程過程調用或RPC。

在本教程中我們將使用RabbitMQ搭建一個RPC系統:一個客戶端和一個可擴展的RPC服務器。由于我們沒有任何值得分配的耗時的任務,所以我們將創建一個返回Fibonacci數的模擬一個RPC服務。

Client interface

為了說明如何使用RPC服務,我們將創建一個簡單的客戶類。它將公開一個名為調用的方法,該方法發送一個RPC請求并阻塞直到接收到結果為止:

$fibonacci_rpc = new FibonacciRpcClient(); $response = $fibonacci_rpc->call(30); echo " [.] Got ", $response, "\n";

關于RPC的一些建議

雖然RPC是計算中非常常見的模式,但它經常遭到批評。當程序員不知道函數調用是本地的,或者它是一個緩慢的RPC時,問題就出現了。這樣的混亂導致了不可預知的系統,并給調試增加了不必要的復雜性。而簡化軟件,濫用會導致難以維護的RPC代碼。

考慮到這一點,請考慮以下建議:

確保很明顯哪個函數調用是本地調用,并且它是遠程的。
記錄系統。使組件之間的依賴關系清晰。
處理錯誤案例。RPC服務器長時間處于下行狀態時,客戶端應如何響應?
有疑問時避免RPC。如果可以,則應該使用異步管道,而不是像阻塞這樣的RPC,結果被異步推送到下一個計算階段。

回調隊列(Callback queue)

一般在RabbitMQ做RPC是容易的。客戶端發送一條請求消息和一個響應消息的服務器回復。為了接收響應,我們需要向請求發送一個“回調”隊列地址。我們可以使用默認隊列。讓我們試試看:

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);$msg = new AMQPMessage($payload,array('reply_to' => $queue_name));$channel->basic_publish($msg, '', 'rpc_queue'); # ... then code to read a response message from the callback_queue ... 消息屬性
AMQP協議(0-9-1 protocol)預定義了一套14個屬性,去一個消息。大多數屬性很少使用,除了以下內容:

delivery_mode: 將消息標記為持久性。 (with a value of 2) or transient (1). 您可能會從第二個教程中記住這個屬性。
content_type:用來描述編碼的MIME類型。例如,對于常用的JSON編碼,將此屬性設置為應用程序/ JSON是一個很好的做法。
reply_to:常用的名字一個回調隊列。
correlation_id:有助于將RPC響應與請求關聯起來。

Correlation Id

在上面介紹的方法中,我們建議為每個RPC請求創建一個回調隊列。這是非常低效的,但幸運的是有一個更好的方法——讓我們為每個客戶機創建一個回調隊列。

這引發了一個新問題,在隊列中收到了響應,不清楚響應的請求屬于哪個。那時候correlation_id屬性用于。我們將把它設置為每個請求的唯一值。稍后,當我們在回調隊列中接收消息時,我們將查看這個屬性,并在此基礎上,我們將能夠將響應與請求匹配。如果我們看到一個未知的correlation_id值,我們可以安全地忽略信息-它不屬于我們的請求。

您可能會問,為什么我們應該忽略回調隊列中的未知消息,而不是失敗出錯呢?這是由于服務器端可能出現競爭情況。雖然不太可能,RPC服務器可能在發送完答案后死亡,但在發出請求的確認消息之前。如果發生這種情況,重新啟動的RPC服務器將再次處理請求。這就是為什么在客戶機上我們必須優雅地處理重復響應,而RPC應該理想地是冪等的。

總結

我們的RPC會像這樣工作:

當客戶端啟動時,它創建一個匿名的獨占回調隊列。

一個RPC請求,客戶端發送消息,兩個屬性:reply_to,設置回調隊列和correlation_id,它被設置為每個請求的唯一值。

請求被發送到一個rpc_queue隊列。

RPC worker(又名:服務器)正在等待該隊列上的請求。當一個請求時,它的工作和發送消息的結果返回給客戶端,使用從reply_to隊列。

客戶機等待回調隊列上的數據。當消息出現時,它檢查correlation_id屬性。如果它與請求的值匹配,則返回對應用程序的響應。

匯總

Fibonacci 遞歸源碼:

function fib($n) {if ($n == 0)return 0;if ($n == 1)return 1;return fib($n-1) + fib($n-2); } `` 我們聲明fibonacci(斐波那契)函數。它只假設有效的正整數輸入。(不要指望這一個能為大數字工作,而且這可能是最慢的遞歸實現)。我們的RPC服務器rpc_server.php代碼看起來像這樣:

<?php

require_once DIR . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('rpc_queue', false, false, false, false);

function fib($n) {

if ($n == 0)return 0; if ($n == 1)return 1; return fib($n-1) + fib($n-2);

}

echo " [x] Awaiting RPC requestsn";
$callback = function($req) {

$n = intval($req->body); echo " [.] fib(", $n, ")\n";$msg = new AMQPMessage((string) fib($n),array('correlation_id' => $req->get('correlation_id')));$req->delivery_info['channel']->basic_publish($msg, '', $req->get('reply_to')); $req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag']);

};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {

$channel->wait();

}

$channel->close();
$connection->close();

?>

服務器代碼相當簡單:像往常一樣,我們從建立連接、通道和聲明隊列開始。我們可能需要運行多個服務器進程。為了分散負載同樣多的服務器需要設置`prefetch_count`, 設置`$channel.basic_qos`美元。我們用`basic_consume`訪問隊列。然后,我們進入while循環,在其中等待請求消息,完成工作并發送響應。我們rpc_client.php RPC客戶端代碼:

<?php

require_once DIR . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

class FibonacciRpcClient {

private $connection; private $channel; private $callback_queue; private $response; private $corr_id;public function __construct() {$this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');$this->channel = $this->connection->channel();list($this->callback_queue, ,) = $this->channel->queue_declare("", false, false, true, false);$this->channel->basic_consume($this->callback_queue, '', false, false, false, false,array($this, 'on_response')); } public function on_response($rep) {if($rep->get('correlation_id') == $this->corr_id) {$this->response = $rep->body;} }public function call($n) {$this->response = null;$this->corr_id = uniqid();$msg = new AMQPMessage((string) $n,array('correlation_id' => $this->corr_id,'reply_to' => $this->callback_queue));$this->channel->basic_publish($msg, '', 'rpc_queue');while(!$this->response) {$this->channel->wait();}return intval($this->response); }

};

$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "n";

?>

現在是一個很好的時間來讓我們完整的示例源代碼rpc_client.php和rpc_server.php。我們的RPC服務現在準備好了。我們可以啟動服務器:

php rpc_server.php
# => [x] Awaiting RPC requests

請求斐波那契數運行客戶機:

php rpc_client.php
# => [x] Requesting fib(30)
``
這里介紹的設計并不是RPC服務的唯一實現,但它有一些重要的要點:

如果RPC服務器太慢,您可以通過運行另一個服務器來擴展。試著在一個新的控制臺再運行第一個:rpc_server.php。

在客戶端,RPC只需要發送和接收一條消息。不喜歡queue_declare需要同步調用。因此,RPC客戶機只需要一次RPC請求的一次網絡往返。

我們的代碼仍然非常簡單,并沒有試圖解決更復雜(但重要)的問題,例如:

如果沒有服務器運行,客戶端應該如何反應?

客戶端應該對RPC有某種超時嗎?

如果服務器發生故障并引發異常,是否應該轉發給客戶端?

在處理前防止無效傳入消息(如檢查邊界、類型)。

如果您想進行實驗,您可能會發現management UI對于查看隊列非常有用。

總結

以上是生活随笔為你收集整理的RabbitMQ+PHP 教程六(RPC)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。