tp5 mysql实现消息队列_TP5系列 | Queue消息队列
消費信息如下ThinkPHP5 Queue消息隊列
優(yōu)點
1、Queue內置了 Redis,Database,Topthink ,Sync這四種驅動,本文使用Redis驅動
2、Queue消息隊列適用于大并發(fā)或者返回結果 時間有點長并需要批量操作的第三方接口,可用于短信發(fā)送、郵件發(fā)送、APP推送
3、Queue消息消息可進行發(fā)布,獲取,執(zhí)行,刪除,重發(fā),失敗處理,延遲執(zhí)行,超時控制等操作
流程圖
創(chuàng)建隊列
文件路徑:application\common\queue\TestQueue.php
TestQueue.php 參考代碼
namespace app\common\queue;
use think\facade\Log;
use think\queue\Job;
class TestQueue
{
public function fire(Job $job, $data)
{
$isJobDone = $this->testJob($data);
// 如果任務執(zhí)行成功后 記得刪除任務,不然這個任務會重復執(zhí)行,直到達到最大重試次數后失敗后,執(zhí)行failed方法
if ($isJobDone) {
$job->delete();
} else {
//通過這個方法可以檢查這個任務已經重試了幾次了
$attempts = $job->attempts();
echo $attempts;
if ($attempts == 0 || $attempts == 1) {
// 重新發(fā)布這個任務
$job->release(2); //$delay為延遲時間,延遲2S后繼續(xù)執(zhí)行
} elseif ($attempts == 2) {
$job->release(5); // 延遲5S后繼續(xù)執(zhí)行
}
}
}
/**
* @Desc: 任務執(zhí)行失敗后自動執(zhí)行方法
* @param $data
*/
public function failed($data)
{
// ...任務達到最大重試次數后,失敗了
Log::error('任務達到最大重試次數后,失敗了 '.json_encode($data));
}
/**
* @Desc: 自定義需要加入的隊列任務
*/
private function testJob($data)
{
$jsonData = json_encode($data);
echo "1、具體執(zhí)行任務接受到的參數:{$jsonData} \r\n";
if($data){
echo "2、恭喜你!{$data['email']} 郵件發(fā)送成功了 \r\n";
return true;
}else{
echo "2、很遺憾,{$data['email']} 郵件發(fā)送失敗了 \r\n";
return false;
}
}
}
配置隊列
1、這里使用Redis驅動來存儲隊列消息
2、隊列配置文件路徑:application\config\queue
配置參考代碼
return [
'connector' => 'Redis',
'expire' => 3600,
'default' => 'REDIS_QUEUE',
'host' => 'dnmp-redis',
'port' => 6379,
'password' => '',
'select' => 0,
'timeout' => 0,
'persistent' => false,
];
生產者參考代碼
/**
* @Desc: 生產者生產消息
*/
public function productMsg()
{
// 當前任務所需的業(yè)務數據,不能為 resource 類型,其他類型最終將轉化為json形式的字符串
$data = [
'email' => rand(11,99).'@qq.com',
'username' => 'Tinywan'
];
// 當前任務歸屬的隊列名稱,如果為新隊列,會自動創(chuàng)建
$queueName = 'testQueue';
// 將該任務推送到消息隊列,等待對應的消費者去執(zhí)行
$isPushed = Queue::push(TestQueue::class, $data, $queueName);
// database 驅動時,返回值為 1|false; redis驅動時,返回值為 隨機字符串|false
if ($isPushed !== false) {
echo '['.$data['email'].']'." 隊列加入成功 \r\n";
} else {
echo "隊列加入失敗 \r\n";
}
}
為了方便演示,這里使用cli模式。
執(zhí)行生產者:php product_msg.php
# php product_msg.php
[27@qq.com] 隊列加入成功
# php product_msg.php
[77@qq.com] 隊列加入成功
1、這時候消息已經被持久化到Redis中去了(通過列表去存儲)
2、推送成功,雖然我們這時候已經寫好了消費者,但是我們并沒有開始消費。但是推送消息依然是成功的。這個就是中間件的優(yōu)勢。他連接兩個系統(tǒng),但是又不會互相耦合,生產者并不會因為消費者的異常而影響到自己。
3、消息推送成功之后,如果沒有消費者,消息會堆積在隊列中。不過別怕,消息堆積很正常,并且一般的中間件堆積能力是非常強的。比如阿里就宣傳自己mq可以堆積上億條數據。
查看Redis消息與隊列
> docker exec -it dnmp-redis redis-cli
127.0.0.1:6379> keys *
127.0.0.1:6379> keys *
1) "queues:testQueue"
127.0.0.1:6379> TYPE queues:testQueue
list
127.0.0.1:6379> LRANGE queues:testQueue 0 -1
1) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"27@qq.com\",\"username\":\"Tinywan\"},\"id\":\"MLgNb4LFALhtmp7HZtfXMFPRUT0r94Bi\",\"attempts\":1}"
2) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"77@qq.com\",\"username\":\"Tinywan\"},\"id\":\"JM16vvjMylfJDnOpldJaHda8xMwuYYzP\",\"attempts\":1}"
127.0.0.1:6379>
消費者
開始消費消息。執(zhí)行cli 命令 php think queue:work--queue隊列名稱
# php think queue:work --queue testQueue
1、具體執(zhí)行任務接受到的參數: {"email":"27@qq.com","username":"Tinywan"}
2、恭喜你!27@qq.com 郵件發(fā)送成功了
Processed: app\common\queue\TestQueue
這里每消費掉一條消息,Redis數據庫中將會減少一條消息
查看Redis隊列消息
127.0.0.1:6379> LRANGE queues:testQueue 0 -1
1) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"77@qq.com\",\"username\":\"Tinywan\"},\"id\":\"JM16vvjMylfJDnOpldJaHda8xMwuYYzP\",\"attempts\":1}"
127.0.0.1:6379>
命令行掛起守護進程執(zhí)行
/usr/bin/php /var/www/tp5/think queue:work --daemon --queue testQueue --memory 256
--daemon 是否循環(huán)執(zhí)行,如果不加該參數則該命令處理完下一個消息就退出 --queue 要處理的隊列的名稱 --delay 0 如果本次任務執(zhí)行拋出異常且任務未被刪除時,設置其下次執(zhí)行前延遲多少秒,默認為0。 --memory 該進程允許使用的內存上限,以M為單位。
流程圖
消費信息如下
php think queue:work --daemon --queue testQueue
1、具體執(zhí)行任務接受到的參數: {"email":"77@qq.com","username":"Tinywan"}
2、恭喜你!77@qq.com 郵件發(fā)送成功了
Processed: app\common\queue\TestQueue
1、具體執(zhí)行任務接受到的參數: {"email":"80@qq.com","username":"Tinywan"}
2、恭喜你!80@qq.com 郵件發(fā)送成功了
Processed: app\common\queue\TestQueue
1、具體執(zhí)行任務接受到的參數: {"email":"34@qq.com","username":"Tinywan"}
2、恭喜你!34@qq.com 郵件發(fā)送成功了
Processed: app\common\queue\TestQueue
1、命令行模式可以常駐內存不停的執(zhí)行php代碼。這樣就可以達到類似于靜態(tài)語言的java的效果。
2、一開始監(jiān)聽隊列。剛剛在隊列中堆積的消息立刻就被獲取到,開始執(zhí)行了代碼。最后執(zhí)行完成,刪除了消息。
3、在 queue:work--daemon 單進程循環(huán)消費的時候,改了代碼是不會生效的。這時腳本語言有點類似于靜態(tài)語言在執(zhí)行。所以需要我們用queue:restart重啟 work 進程 。
命令行掛起守護進程執(zhí)行
/usr/local/php/bin/php /data/wwwroot/default/thinkphp_5/think queue:work --daemon --queue testQueue --memory 256
查看進程是否在運行
# ps
PID USER TIME COMMAND
1 root 0:00 php-fpm: master process (/usr/local/etc/php-fpm.conf)
6 www-data 0:00 php-fpm: pool www
7 www-data 0:00 php-fpm: pool www
16 root 0:00 sh
56 root 0:00 sh
113 root 0:00 php think queue:work --daemon --queue testQueue
你再也不用守在終端了,以后只負責生產消息就可以了。Redis隊列也不會積累消息了
其他(中間件)
中間件系統(tǒng)的定義是兩個獨立的不同的系統(tǒng)在中間構建起傳遞消息的工具。但是同一個系統(tǒng)也可以通過中間件來榨取性能,大家肯定項目中遇到過性能瓶頸。
比如發(fā)送郵件,發(fā)送短信,轉換視頻格式等等。這些業(yè)務都是比較耗性能,又對先后順序不敏感的業(yè)務。這種業(yè)務就非常適合使用消息隊列系統(tǒng)來異步處理,使性能提升。
重啟隊列和生成隊列
總結
以上是生活随笔為你收集整理的tp5 mysql实现消息队列_TP5系列 | Queue消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: z3735f android x86,英
- 下一篇: linux cmake编译源码,linu