日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

rabbitmq代码

發布時間:2024/9/30 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rabbitmq代码 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

          • 1.生產者代碼
          • 2.消費者代碼
            • 2.1 消費冪等性代碼
            • 2.2 消費者rpc代碼
            • 2.3 消費者消費重試
            • 2.4 消費者直接交換機代碼
            • 2.5 基礎代碼

1.生產者代碼
<?php namespace app\controller;use app\BaseController; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; use think\facade\Cache;class RabbitMq extends BaseController {/*** 直接發送到隊列* @return string*/public function send(){ // echo "hello ,mq";die();//隊列名稱,每個消息都會被投入到1個或者多個隊列$queue = "hello_durable_true";//建立連接$connection = new AMQPStreamConnection("localhost", '5672', 'guest', 'guest', '/');//獲取信道$channel = $connection->channel();//聲明創建隊列$channel->queue_declare($queue, false, true, false, false);for ($i = 0; $i < 5; $i++) {sleep(1); //休眠1秒//消息內容$messageBody = "hello,努力,Now time:" . date("Y-m-d H:i:s");//將我們需要的消息標記為持久化$message = new AMQPMessage($messageBody, array("content_type" => "text/plain", "delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT));//發送消息$channel->basic_publish($message, '', $queue);echo "send Message" . $i . "<br>\n";}//關閉信道$channel->close();//關閉連接$connection->close();return "send success";}/*** 發布訂閱*/public function send1(){//交換機名稱$exchange = "logs";//建立連接$connection = new AMQPStreamConnection("localhost", "5672", "guest", "guest", "/");//獲取信道$channel = $connection->channel();//聲明交換機$channel->exchange_declare($exchange, "fanout", false, false, false);for ($i = 0; $i < 5; $i++) {sleep(1);//休眠1秒//消息內容$messageBody = "hello,努力,Now time:" . date("Y-m-d H:i:s");//將我們需要的消息標記為持久化$message = new AMQPMessage($messageBody, array("content_type" => "text/plain","delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT));//發送消息$channel->basic_publish($message, $exchange);echo "Send exchange message:" . $i . "<br>\n";}//關閉信道$channel->close();//關閉連接$connection->close();return "Send Sueccess";}/*** 直接交換機* @return string*/public function direct(){ // echo "direct";die();//交換機名稱$exchange = "direct_logs";//建立連接$connection = new AMQPStreamConnection("127.0.0.1","5672","guest","guest","/");//獲取信道$channel = $connection->channel();//聲明交換機$channel->exchange_declare($exchange,"direct",false,false,false);//模擬發送error消息內容$messageBody = "error,Now Time:".date("Y-m-d H:i:s");//將我們需要的消息標記為持久化$message = new AMQPMessage($messageBody,array('content_type' => 'text/plain','delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));//綁定info路由$channel->basic_publish($message,$exchange,"error");//模擬發送warning消息內容$messageBody = "warning, Now Time:".date("h:i:s");//將我們需要的消息標記為持久化 - 通過設置AMQPMessage的參數delivery_mode = AMQPMessage::DELIVERY_MODE_PERSISTENT$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain','delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));//綁定warning路由$channel->basic_publish($message, $exchange, "warning");//關閉信道$channel->close();//關閉連接$connection->close();return 'Send Success';}/*** 遠程調用*/public function rpc(){$connection = new AMQPStreamConnection("localhost","5672","guest","guest","/");$channel = $connection->channel();//申明回調隊列list($callback_queue,,) = $channel->queue_declare("callback_queue",false,false,true,false);//RPC客戶端請求參數$corr_id = uniqid();$msg = new AMQPMessage("rpc client send message",array("correlation_id"=>$corr_id,'reply_to' =>$callback_queue));//發送RPC請求$channel->basic_publish($msg,'','rpc_queue');//在rpc服務端返回的內容$response = null;//等待RPC回調$channel->basic_consume($callback_queue,'',false,false,false,false,function ($reply)use ($corr_id,&$response){if ($reply->get("correlation_id") == $corr_id){$response = $reply->body;}//確認消息處理完成$reply->delivery_info['channel']->basic_ack($reply->delivery_info['delivery_tag']);});while (!$response){$channel->wait();}var_dump($response);$channel->close();$connection->close();}/*** 延遲隊列 死信隊列* @return string*/public function delay(){//建立連接$connection = new AMQPStreamConnection("localhost", "5672", "guest", "guest", "/");//獲取信道$channel = $connection->channel();//創建DLX及死信隊列$channel->exchange_declare("dlx_exchange", "direct", false, false, false);$channel->queue_declare("dlx_queue",false,true,false,false);$channel->queue_bind("dlx_queue","dlx_exchange","dlx_routing_key");//創建延遲隊列$channel->exchange_declare("delay_exchange",'direct',false,false,false);$args = new AMQPTable();//消息過期方式:設置queue.normal隊列中的消息,5s后過期$args->set('x-message-ttl',5000);//設置隊列最大長度方式:x-max-length$args->set("x-max-length",1);$args->set('x-dead-letter-exchange','dlx_exchange');$args->set('x-dead-letter-routing-key',"dlx_routing_key");$channel->queue_declare("delay_queue",false,true,false,false,false,$args);$channel->queue_bind("delay_queue","delay_exchange","delay_routing_key");//模擬發送消息內容$messageBody = "該消息將在5s后發送到延遲隊列(".date("h:i:s").")";//將我們需要的消息標記為持久化 - 通過設置AMQPMessage的參數delivery_mode = AMQPMessage::DELIVERY_MODE_PERSISTENT$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain','delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));//綁定delay.routingKey路由$channel->basic_publish($message, "delay_exchange", "delay_routing_key");//關閉信道$channel->close();//關閉連接$connection->close();return 'Send Success';}/*** 重試隊列*/public function retry(){//建立連接$connection = new AMQPStreamConnection("localhost", "5672", "guest", "guest", "/");//獲取信道$channel = $connection->channel();//創建DLX及死信隊列$channel->exchange_declare("dlx_exchange", "direct", false, false, false);$channel->queue_declare("dlx_queue", false, true, false, false);$channel->queue_bind("dlx_queue", "dlx_exchange", "dlx_routing_key");//創建延遲隊列$channel->exchange_declare("delay_exchange", "direct", false, false, false);$args = new AMQPTable();// 消息過期方式:設置 queue.normal 隊列中的消息5s之后過期$args->set('x-message-ttl', 5000);// 設置隊列最大長度方式: x-max-length // $args->set('x-max-length', 100);$args->set('x-dead-letter-exchange', 'dlx_exchange');$args->set('x-dead-letter-routing-key', 'dlx_routing_key');$channel->queue_declare("delay_queue", false, true, false, false, false, $args);$channel->queue_bind("delay_queue", "delay_exchange", "delay_routing_key");//模擬發送消息內容$messageBody = "該消息將在5s后發送到延遲隊列(".date("h:i:s").")";//將我們需要的消息標記為持久化 - 通過設置AMQPMessage的參數delivery_mode = AMQPMessage::DELIVERY_MODE_PERSISTENT$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain','delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));//設置重試次數,默認0$headers = new AMQPTable(["retry_nums" => 0]);$message->set('application_headers', $headers);//綁定delay.routingKey路由$channel->basic_publish($message, "delay_exchange", "delay_routing_key");//關閉信道$channel->close();//關閉連接$connection->close();return 'Send Success';}/*** 消費冪等* @return string*/public function idempotent(){//建立連接$connection = new AMQPStreamConnection("localhost", "5672", "guest", "guest", "/");//獲取信道$channel = $connection->channel();//創建DLX及死信隊列$channel->exchange_declare("dlx_exchange1", "direct", false, false, false);$channel->queue_declare("dlx_queue1", false, true, false, false);$channel->queue_bind("dlx_queue1", "dlx_exchange1", "dlx_routing_key1");//創建延遲隊列$channel->exchange_declare("delay_exchange1", "direct", false, false, false);$args = new AMQPTable();// 消息過期方式:設置 queue.normal 隊列中的消息5s之后過期$args->set('x-message-ttl', 5000);// 設置隊列最大長度方式: x-max-length//$args->set('x-max-length', 1);$args->set('x-dead-letter-exchange', 'dlx_exchange1');$args->set('x-dead-letter-routing-key', 'dlx_routing_key1');$channel->queue_declare("delay_queue1", false, true, false, false, false, $args);$channel->queue_bind("delay_queue1", "delay_exchange1", "delay_routing_key1");//模擬發送消息內容$messageBody = "重復消息發送(".date("h:i:s").")";//將我們需要的消息標記為持久化 - 通過設置AMQPMessage的參數delivery_mode = AMQPMessage::DELIVERY_MODE_PERSISTENT$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain','delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));//設置消息ID,防止重復消費$corr_id = uniqid();$headers = new AMQPTable(["correlation_id" => $corr_id]);$message->set('application_headers', $headers);Cache::set($corr_id, $corr_id, 3600);//綁定delay.routingKey路由$channel->basic_publish($message, "delay_exchange1", "delay_routing_key1");//模擬發送重復消息$channel->basic_publish($message, "delay_exchange1", "delay_routing_key1");//關閉信道$channel->close();//關閉連接$connection->close();return 'Send Success';} }
2.消費者代碼
2.1 消費冪等性代碼
<?phpnamespace app\command;use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Wire\AMQPTable; use think\console\Command; use think\console\Input; use think\console\input\Argument; use think\console\input\Option; use think\console\Output; use think\facade\Cache;class Idempotentnd extends Command {protected function configure(){// 指令配置$this->setName('idempotent')->setDescription('the idempotent command'); }protected function execute(Input $input, Output $output){//交換機名$exchange = "dlx_exchange1";//建立連接$connection = new AMQPStreamConnection("localhost", "5672", "guest", "guest", "/");//獲取信道$channel = $connection->channel();//聲明交換機$channel->exchange_declare($exchange, 'direct', false, false, false);$channel->queue_declare("dlx_queue1", false, true, false, false);$channel->queue_bind("dlx_queue1", $exchange, "dlx_routing_key1");//創建重試隊列$channel->exchange_declare("delay_exchange1", "direct", false, false, false);$args = new AMQPTable();// 消息過期方式:設置 queue.normal 隊列中的消息5s之后過期$args->set('x-message-ttl', 5000);// 設置隊列最大長度方式: x-max-length //$args->set('x-max-length', 1);$args->set('x-dead-letter-exchange', 'dlx_exchange1');$args->set('x-dead-letter-routing-key', 'dlx_routing_key1');$channel->queue_declare("delay_queue1", false, true, false, false, false, $args);$channel->queue_bind("delay_queue1", "delay_exchange1", "delay_routing_key1");//消息消費$channel->basic_consume("dlx_queue1", '', false, false, false, false, function ($msg) use ($output, $channel) {$msg_headers = $msg->get('application_headers')->getNativeData();$corr_id = $msg_headers['correlation_id'];//判斷是否已經消費過if(Cache::get($corr_id) === null){$body = "該消息已消費,不再消費";$output->writeln(date("h:i:s") . $body . PHP_EOL);//確認消息處理完成$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);return;}$body = $msg->body;$output->writeln("生產者發送的消息:".date("h:i:s") . " Received " . $body . PHP_EOL);//確認消息處理完成$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);Cache::delete($corr_id);});while (count($channel->callbacks)) {$channel->wait();}//關閉信道$channel->close();//關閉連接$connection->close();} }
2.2 消費者rpc代碼
<?php declare (strict_types = 1);namespace app\command;use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use think\console\Command; use think\console\Input; use think\console\input\Argument; use think\console\input\Option; use think\console\Output;class RpcCommand extends Command {protected function configure(){// 指令配置$this->setName('rpc')->setDescription('the rpc command'); }protected function execute1(Input $input, Output $output){$queue = "rpc_queue";$connection = new AMQPStreamConnection("localhost","5672","guest","guest","/");$channel = $connection->channel();$channel->queue_declare($queue,false,false,true,false);//公平調度$channel->basic_qos(null,1,null);$channel->basic_consume($queue,'',false,false,false,false,function ($msg) use($output){//接受rpc客戶端接收的消息$output->writeln("Received:".$msg->body.PHP_EOL);//執行方法,回調隊列$reply = new AMQPMessage("rpc server replay message",array("correlation_id"=>$msg->get('correlation_id')));$msg->delivery_info['channel']->basic_publish($reply,'',$msg->get('reply_to'));//確認消息處理完成$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);});while (count($channel->callbacks)){$channel->wait();}//關閉信道$channel->close();//關閉連接$connection->close();}protected function execute(Input $input, Output $output){//隊列名$queue = "rpc_queue";//建立連接$connection = new AMQPStreamConnection("localhost", "5672", "guest", "guest", "/");//獲取信道$channel = $connection->channel();//聲明創建隊列$channel->queue_declare($queue, false, false, true, false);//公平調度$channel->basic_qos(null, 1, null);//消息消費$channel->basic_consume($queue, '', false, false, false, false, function ($msg) use ($output) {//接收到RPC客戶端收到的消息$output->writeln(" Received " . $msg->body . PHP_EOL);//執行方法,回調隊列$reply = new AMQPMessage("rpc server replay message",array('correlation_id' => $msg->get('correlation_id')));$msg->delivery_info['channel']->basic_publish($reply, '', $msg->get('reply_to'));//確認消息處理完成$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);});while (count($channel->callbacks)) {$channel->wait();}//關閉信道$channel->close();//關閉連接$connection->close();} }
2.3 消費者消費重試
<?php declare (strict_types = 1);namespace app\command;use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Wire\AMQPTable; use think\console\Command; use think\console\Input; use think\console\input\Argument; use think\console\input\Option; use think\console\Output;class RetryCommand extends Command {protected function configure(){// 指令配置$this->setName('retry')->setDescription('the retry command'); }protected function execute(Input $input, Output $output){//交換機名$exchange = "dlx_exchange";//建立連接$connection = new AMQPStreamConnection("localhost", "5672", "guest", "guest", "/");//獲取信道$channel = $connection->channel();//聲明交換機$channel->exchange_declare($exchange, 'direct', false, false, false);$channel->queue_declare("dlx_queue", false, true, false, false);$channel->queue_bind("dlx_queue", $exchange, "dlx_routing_key");//創建重試隊列$channel->exchange_declare("delay_exchange", "direct", false, false, false);//消息消費$channel->basic_consume("dlx_queue", '', false, false, false, false, function ($msg)use($output,$channel){$body = $msg->body;$output->writeln(date("h:i:s") . " Received " . $body . PHP_EOL);//確認消息處理完成$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);$msg_headers = $msg->get('application_headers')->getNativeData();//重試次數超過3次,則入庫告警if (intval($msg_headers['retry_nums']) > 3) {$body = "重試次數超過3次,則入庫告警";$output->writeln(date("h:i:s") . " Error " . $body . PHP_EOL);} else {//重試次數加1$headers = new AMQPTable(["retry_nums" => intval($msg_headers['retry_nums']) + 1]);$msg->set('application_headers', $headers);//放回重試隊列$channel->basic_publish($msg, "delay_exchange", "delay_routing_key");}});while (count($channel->callbacks)) {$channel->wait();}//關閉信道$channel->close();//關閉連接$connection->close();} }
2.4 消費者直接交換機代碼
<?php namespace app\command;use PhpAmqpLib\Connection\AMQPStreamConnection; use think\console\Command; use think\console\Input; use think\console\input\Argument; use think\console\input\Option; use think\console\Output;class Direct1 extends Command {protected function configure(){// 指令配置$this->setName('directRabbitMQ1')->setDescription('the directRabbitMQ1 command'); }protected function execute(Input $input, Output $output){//交換機名$exchange = "direct_logs";//建立連接$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest', '/');//獲取信道$channel = $connection->channel();//聲明交換機$channel->exchange_declare($exchange, 'direct', false, false, false);//聲明創建隊列//隊列名稱為空時,會生成一個隨機名稱隊列list($queue, ,) = $channel->queue_declare('', false, false, true, false);//綁定交換機與隊列,并指定路由info$channel->queue_bind($queue, $exchange, 'info');//綁定交換機與隊列,并指定路由error$channel->queue_bind($queue, $exchange, 'error');//綁定交換機與隊列,并指定路由warning$channel->queue_bind($queue, $exchange, 'warning');//消息消費$channel->basic_consume($queue, '', false, false, false, false, function ($msg) use ($output) {//模擬耗時sleep(3);$output->writeln(" Received " . $msg->body . PHP_EOL);//確認消息處理完成$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);});while (count($channel->callbacks)) {$channel->wait();}//關閉信道$channel->close();//關閉連接$connection->close();} }
2.5 基礎代碼
<?php namespace app\command;use PhpAmqpLib\Connection\AMQPStreamConnection; use think\console\Command; use think\console\Input; use think\console\Output;class Rabbitmq extends Command {public function configure(){//指令配置$this->setName("mq")->setDescription("the mq command");}protected function execute(Input $input, Output $output){//隊列名$queue = "hello_durable_true";//建立連接$connection = new AMQPStreamConnection("localhost","5672","guest","guest");//獲取信道$channel = $connection->channel();//聲明創建隊列//生產者和消費者對queue的聲明函數里,這個durable必須保持一致。否則報錯PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello' in vhost '/': received 'true' but current is 'false'$channel->queue_declare($queue,false,true,false,false);//消息消費$channel->basic_consume($queue,'',false,false,false,false,function ($msg) use ($output){sleep(3);//輸出到終端$output->writeln("Received".$msg->body.PHP_EOL);//確認消息處理完成$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);});while (count($channel->callbacks)){$channel->wait();}//關閉信道$channel->close();//關閉連接$connection->close();} }

總結

以上是生活随笔為你收集整理的rabbitmq代码的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。