日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

tp5 mysql实现消息队列_TP5系列 | Queue消息队列

發(fā)布時間:2024/9/27 数据库 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 tp5 mysql实现消息队列_TP5系列 | Queue消息队列 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

消費信息如下ThinkPHP5 Queue消息隊列

優(yōu)點

1、Queue內置了 Redis,Database,Topthink ,Sync這四種驅動,本文使用Redis驅動

2、Queue消息隊列適用于大并發(fā)或者返回結果 時間有點長并需要批量操作的第三方接口,可用于短信發(fā)送、郵件發(fā)送、APP推送

3、Queue消息消息可進行發(fā)布,獲取,執(zhí)行,刪除,重發(fā),失敗處理,延遲執(zhí)行,超時控制等操作

流程圖

創(chuàng)建隊列

文件路徑:application\common\queue\TestQueue.php

TestQueue.php 參考代碼

namespace app\common\queue;

use think\facade\Log;

use think\queue\Job;

class TestQueue

{

public function fire(Job $job, $data)

{

$isJobDone = $this->testJob($data);

// 如果任務執(zhí)行成功后 記得刪除任務,不然這個任務會重復執(zhí)行,直到達到最大重試次數后失敗后,執(zhí)行failed方法

if ($isJobDone) {

$job->delete();

} else {

//通過這個方法可以檢查這個任務已經重試了幾次了

$attempts = $job->attempts();

echo $attempts;

if ($attempts == 0 || $attempts == 1) {

// 重新發(fā)布這個任務

$job->release(2); //$delay為延遲時間,延遲2S后繼續(xù)執(zhí)行

} elseif ($attempts == 2) {

$job->release(5); // 延遲5S后繼續(xù)執(zhí)行

}

}

}

/**

* @Desc: 任務執(zhí)行失敗后自動執(zhí)行方法

* @param $data

*/

public function failed($data)

{

// ...任務達到最大重試次數后,失敗了

Log::error('任務達到最大重試次數后,失敗了 '.json_encode($data));

}

/**

* @Desc: 自定義需要加入的隊列任務

*/

private function testJob($data)

{

$jsonData = json_encode($data);

echo "1、具體執(zhí)行任務接受到的參數:{$jsonData} \r\n";

if($data){

echo "2、恭喜你!{$data['email']} 郵件發(fā)送成功了 \r\n";

return true;

}else{

echo "2、很遺憾,{$data['email']} 郵件發(fā)送失敗了 \r\n";

return false;

}

}

}

配置隊列

1、這里使用Redis驅動來存儲隊列消息

2、隊列配置文件路徑:application\config\queue

配置參考代碼

return [

'connector' => 'Redis',

'expire' => 3600,

'default' => 'REDIS_QUEUE',

'host' => 'dnmp-redis',

'port' => 6379,

'password' => '',

'select' => 0,

'timeout' => 0,

'persistent' => false,

];

生產者參考代碼

/**

* @Desc: 生產者生產消息

*/

public function productMsg()

{

// 當前任務所需的業(yè)務數據,不能為 resource 類型,其他類型最終將轉化為json形式的字符串

$data = [

'email' => rand(11,99).'@qq.com',

'username' => 'Tinywan'

];

// 當前任務歸屬的隊列名稱,如果為新隊列,會自動創(chuàng)建

$queueName = 'testQueue';

// 將該任務推送到消息隊列,等待對應的消費者去執(zhí)行

$isPushed = Queue::push(TestQueue::class, $data, $queueName);

// database 驅動時,返回值為 1|false; redis驅動時,返回值為 隨機字符串|false

if ($isPushed !== false) {

echo '['.$data['email'].']'." 隊列加入成功 \r\n";

} else {

echo "隊列加入失敗 \r\n";

}

}

為了方便演示,這里使用cli模式。

執(zhí)行生產者:php product_msg.php

# php product_msg.php

[27@qq.com] 隊列加入成功

# php product_msg.php

[77@qq.com] 隊列加入成功

1、這時候消息已經被持久化到Redis中去了(通過列表去存儲)

2、推送成功,雖然我們這時候已經寫好了消費者,但是我們并沒有開始消費。但是推送消息依然是成功的。這個就是中間件的優(yōu)勢。他連接兩個系統(tǒng),但是又不會互相耦合,生產者并不會因為消費者的異常而影響到自己。

3、消息推送成功之后,如果沒有消費者,消息會堆積在隊列中。不過別怕,消息堆積很正常,并且一般的中間件堆積能力是非常強的。比如阿里就宣傳自己mq可以堆積上億條數據。

查看Redis消息與隊列

> docker exec -it dnmp-redis redis-cli

127.0.0.1:6379> keys *

127.0.0.1:6379> keys *

1) "queues:testQueue"

127.0.0.1:6379> TYPE queues:testQueue

list

127.0.0.1:6379> LRANGE queues:testQueue 0 -1

1) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"27@qq.com\",\"username\":\"Tinywan\"},\"id\":\"MLgNb4LFALhtmp7HZtfXMFPRUT0r94Bi\",\"attempts\":1}"

2) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"77@qq.com\",\"username\":\"Tinywan\"},\"id\":\"JM16vvjMylfJDnOpldJaHda8xMwuYYzP\",\"attempts\":1}"

127.0.0.1:6379>

消費者

開始消費消息。執(zhí)行cli 命令 php think queue:work--queue隊列名稱

# php think queue:work --queue testQueue

1、具體執(zhí)行任務接受到的參數: {"email":"27@qq.com","username":"Tinywan"}

2、恭喜你!27@qq.com 郵件發(fā)送成功了

Processed: app\common\queue\TestQueue

這里每消費掉一條消息,Redis數據庫中將會減少一條消息

查看Redis隊列消息

127.0.0.1:6379> LRANGE queues:testQueue 0 -1

1) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"77@qq.com\",\"username\":\"Tinywan\"},\"id\":\"JM16vvjMylfJDnOpldJaHda8xMwuYYzP\",\"attempts\":1}"

127.0.0.1:6379>

命令行掛起守護進程執(zhí)行

/usr/bin/php /var/www/tp5/think queue:work --daemon --queue testQueue --memory 256

--daemon 是否循環(huán)執(zhí)行,如果不加該參數則該命令處理完下一個消息就退出 --queue 要處理的隊列的名稱 --delay 0 如果本次任務執(zhí)行拋出異常且任務未被刪除時,設置其下次執(zhí)行前延遲多少秒,默認為0。 --memory 該進程允許使用的內存上限,以M為單位。

流程圖

消費信息如下

php think queue:work --daemon --queue testQueue

1、具體執(zhí)行任務接受到的參數: {"email":"77@qq.com","username":"Tinywan"}

2、恭喜你!77@qq.com 郵件發(fā)送成功了

Processed: app\common\queue\TestQueue

1、具體執(zhí)行任務接受到的參數: {"email":"80@qq.com","username":"Tinywan"}

2、恭喜你!80@qq.com 郵件發(fā)送成功了

Processed: app\common\queue\TestQueue

1、具體執(zhí)行任務接受到的參數: {"email":"34@qq.com","username":"Tinywan"}

2、恭喜你!34@qq.com 郵件發(fā)送成功了

Processed: app\common\queue\TestQueue

1、命令行模式可以常駐內存不停的執(zhí)行php代碼。這樣就可以達到類似于靜態(tài)語言的java的效果。

2、一開始監(jiān)聽隊列。剛剛在隊列中堆積的消息立刻就被獲取到,開始執(zhí)行了代碼。最后執(zhí)行完成,刪除了消息。

3、在 queue:work--daemon 單進程循環(huán)消費的時候,改了代碼是不會生效的。這時腳本語言有點類似于靜態(tài)語言在執(zhí)行。所以需要我們用queue:restart重啟 work 進程 。

命令行掛起守護進程執(zhí)行

/usr/local/php/bin/php /data/wwwroot/default/thinkphp_5/think queue:work --daemon --queue testQueue --memory 256

查看進程是否在運行

# ps

PID USER TIME COMMAND

1 root 0:00 php-fpm: master process (/usr/local/etc/php-fpm.conf)

6 www-data 0:00 php-fpm: pool www

7 www-data 0:00 php-fpm: pool www

16 root 0:00 sh

56 root 0:00 sh

113 root 0:00 php think queue:work --daemon --queue testQueue

你再也不用守在終端了,以后只負責生產消息就可以了。Redis隊列也不會積累消息了

其他(中間件)

中間件系統(tǒng)的定義是兩個獨立的不同的系統(tǒng)在中間構建起傳遞消息的工具。但是同一個系統(tǒng)也可以通過中間件來榨取性能,大家肯定項目中遇到過性能瓶頸。

比如發(fā)送郵件,發(fā)送短信,轉換視頻格式等等。這些業(yè)務都是比較耗性能,又對先后順序不敏感的業(yè)務。這種業(yè)務就非常適合使用消息隊列系統(tǒng)來異步處理,使性能提升。

重啟隊列和生成隊列

總結

以上是生活随笔為你收集整理的tp5 mysql实现消息队列_TP5系列 | Queue消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。