Beanstalked的初步了解和使用(包括利用beanstalkd 秒杀消息队列的实现)
一? Beanstalkd 是什么
Beanstalkd,一個高性能、輕量級的分布式內存隊列系統
?
二? Beanstalkd 特性
1. 優先級(priority)
注:優先級就意味 支持任務插隊(數字越小,優先級越高,0的優先級最高)
2. 延遲(delay)
注:延遲意味著可以定義任務什么時間才開始被消費,也就實現了定時任務(比如為了增加網站活躍性,增加定時評論,定時點贊功能)
3. 持久化(persistent data)
注:Beanstalkd 支持定時將文件刷到日志文件里,即使beanstalkd宕機,重啟之后仍然可以找回文件
4. 預留(buried)
注:Beanstalkd支持把一個任務設置為預留,這樣,消費者就無法取出這個任務了,等合適的時機再把這個任務拿出來消費
5. 任務超時重發(time-to-run)
注:消費者必須在指定的時間內處理完這個任務,否則就認為消費者處理失敗,任務會被重新放到隊列,等待消費
三 管道(tube)與任務(job)
注:生產者生產任務,并根據業務需求將任務放到不同管道中,比如和注冊有關的任務放到注冊管道中,和訂單有關的放到訂單管道中
注:任務從進入管道到離開管道一共有5個狀態(ready,delayed,reserved,buried,delete)
1. 生產者將任務放到管道中,任務的狀態可以是ready(表示任務已經準備好,隨時可以被消費者讀取),也可以是delayed(任務在被生產者放入管道時,設置了延遲,比如設置了5s延遲,意味著5s之后,這個任務才會變成ready狀態,才可以被消費者讀取)
2. 消費者消費任務(消費者將處于ready狀態的任務讀出來后,被讀取處理的任務狀態變為reserved)
3. 消費者處理完任務后,任務的狀態可能是delete(刪除,處理成功),可能是buried(預留,意味著先把任務放一邊,等待條件成熟還要用),可能是ready,也可能是delayed,需要根據具體業務場景自己進行判斷定義
具體示意圖:
四 Beanstalkd 的安裝(git、方式)
注:beanstalkd是不支持windows的 ,必須要在Linux環境下(本地環境介紹 Linux, php7,mysql5.6,nginx,centos7.4)?
1.?yum install -y git?
2.??git clone https://github.com/kr/beanstalkd?
3.? cd beanstalkd?
4. make
5. make install
6. 查看安裝是否成功
五 開始使用Beanstalkd
1. 綁定地址和端口
beanstalkd -l 127.0.0.1 -p 11301 &
2. composer安裝pheanstalkd類(這個類在php使用中會簡化很多操作,非常方便)
--------------
2.1 安裝composer? ? ?curl -sS https://getcomposer.org/installer | php
2.2 將composer全局調用? ? ?mv composer.phar /usr/local/bin/composer
2.3??cd /usr/local/bin
2.4?chmod +x composer
-----------------------
2.5 composer require pda/pheanstalk
注:pheanstalkd 用法示例:https://github.com/pda/pheanstalk
3. 簡單使用
環境介紹(阿里云Linux服務器, 本地win10,Filezilla用于雙方文件傳輸 ,xshell用于連接遠程阿里云服務器)
3.1? 編寫demo.php
<?php require "vendor/autoload.php"; use Pheanstalk\Pheanstalk; $ph = new Pheanstalk('127.0.0.1',11301); print_r($ph->stats());//查看目前pheanStalkd狀態信息
3.2? 將在本地編寫的demo.php代碼通過filezilla上傳到阿里云服務器 /usr/local/beanstalkd/beanstalkd下,并在linux下運行
4. pheanstalkd類常用的方法
<?php require "vendor/autoload.php"; use Pheanstalk\Pheanstalk; $ph = new Pheanstalk('127.0.0.1',11301); //----------------------------------------維護類---------------------------------- //1.查看目前pheanStalkd狀態信息 //print_r($ph->stats()); //2.顯示目前存在的管道 //print_r($ph->listTubes()); //3.查看NewUsers管道的信息 //$ph->useTube('NewUsers')->put('test'); //$ph->useTube('NewUsers')->put('up'); //4.向NewUsers管道添加一個up任務 //print_r($ph->statsTube('NewUsers'));//3.查看NewUsers管道的信息 //6.查看指定管道中某一個任務的情況 //$job = $ph->watch('NewUsers')->reserve(); //5.從管道中取出任務(消費) //print_r($ph->statsJob($job)); //6.查看指定管道中某一個任務的情況 //7.查看任務id為1的任務詳情 //$job = $ph->peek(1);7.直接取出任務id為1的任務 [注:beanstalkd中所有任務的id都具有唯一性] //print_r($ph->statsJob($job));//查看任務id為1的任務詳情 //----------------------------------------生產類-------------------------------------- 第一種 put() //$tube = $ph->useTube('NewUsers');//連接NewUsers管道 //print_r($tube->put('four'));//向NewUsers管道添加任務four,并返回結果 //注: put()方法還有3個可選參數(依次為: 優先級priority,延遲時間delay,任務超時重發ttr) 第二種 putInTube() [注: putInTube()就是對useTube()和put()的封裝] //$res = $ph->putInTube('NewUsers','three');//向NewUsers管道添加任務three 注: putInTube()方法還有3個可選參數(依次為: 優先級priority,延遲時間delay,任務超時重發ttr) //print_r($res);//返回任務id //print_r($ph->statsTube('NewUsers'));//查看NewUsers管道的詳細情況 //---------------------------------------消費類-------------------------------------- // 1.watch 監聽NewUsers管道 [ 注: watch()同樣可以監聽多個管道 ] //$tube = $ph->watch('NewUsers'); //print_r($ph->listTubesWatched());//打印已經監聽的管道 // 2.watch 監聽多個管道 //$tube = $ph->watch('NewUsers') // ->watch('default'); //print_r($ph->listTubesWatched());//打印已經監聽的管道 // 3.ignore 監聽NewUsers管道,忽略default管道 //$tube = $ph->watch('NewUsers') // ->ignore('default'); //print_r($ph->listTubesWatched());//打印已經監聽的管道 // 4.reserve 監聽NewUsers管道,并且取出任務 //$job = $ph->watch('NewUsers') // ->reserve(); // 注reserve()有1個參數,阻塞的時間,過了阻塞時間,不管有沒有東西,直接返回 // //var_dump($job);//打印已經取出的任務 //$ph->delete($job);//刪除已經取出的任務 // 5.putInTube/put 向NewUsers管道寫入任務 [ 注:此為生產者方法,放到此處是為了方便理解 ] //$ph->putInTube('NewUsers','number_1',5); //$ph->putInTube('NewUsers','number_2',3); //$ph->putInTube('NewUsers','number_3',0); //$ph->putInTube('NewUsers','number_4',4); //print_r($ph->statsTube('NewUsers'));//5.查看NewUsers管道詳細信息 // 6.release 將取出的任務放回ready狀態,還有2個參數(優先級和延遲) //$job = $ph->watch('NewUsers')->reserve();//6.監聽NewUsers管道,并取出任務 //if (true) { // sleep(30); // $ph->release($job);//6.將任務取出之后,停留30秒,然后將任務狀態重新變為ready //} else { // $ph->delete($job); //} // 7.bury (預留) 將任務取出之后,發現后面執行的邏輯不成熟(比如發郵件,突然發現郵件服務器掛掉了), //或者說還不能執行后面的邏輯,需要把任務先封存起來,等待時機成熟了,再拿出這個任務進行消費 //$job = $ph->watch('NewUsers')->reserve();//取出任務 //$ph->bury($job);//取出任務后,將任務放到一邊(預留) // 8.peekBuried() 將處在bury狀態的任務讀取出來 //$job = $ph->peekBuried('NewUsers');//將NewUsers管道中處在bury狀態的任務讀取出來 //var_dump($ph->statsJob($job));//打印任務狀態(此時任務狀態應該是bury) // 9.kickJob() 將處在bury任務狀態的任務轉化為ready狀態 //$job = $ph->peekBuried('NewUsers');//將NewUsers管道中處在bury狀態的任務讀取出來 //$ph->kickJob($job); // 10.kick() 將處在bury任務狀態的任務轉化為ready狀態,有第二個參數int, 批量將任務id小于此數值的任務轉化為ready //$ph->useTube('NewUsers')->kick(65);//把NewUsers管道中任務id小于65,并且任務狀態處于bury的任務全部轉化為ready // 11.peekReady() 將管道中處于ready狀態的任務讀出來 //$job = $ph->peekReady('NewUser');//將NewUser管道中處于ready狀態的任務讀取出來 //var_dump($job); //$ph->delete($job); // 12.peekDelay() 將管道中所有處于delay狀態的任務讀取出來 //$job = $ph->peekDelayed('NewUser'); //var_dump($job); //$ph->delete($job); // 13.pauseTube() 對整個管道進行延遲設置,讓管道處于延遲狀態 //$ph->pauseTube('NewUser',10);//設置管道NewUser延遲時間為10s //$job = $ph->watch('NewUser')->reserve();//監聽NewUser管道,并取出任務 //var_dump($job); // 14.resumeTube() 恢復管道,讓管道處于不延遲狀態,立即被消費 //$ph->resumeTube('NewUser');//取消管道NewUser的延遲狀態,變為立即讀取 //$job = $ph->watch('NewUser')->reserve();//監聽NewUser管道,并取出任務 //var_dump($job); // 15.touch() 讓任務重新計算任務超時重發ttr時間,相當于給任務延長壽命5. 項目中的應用總結
生產者中常用的方法
useTube() : 如果沒有管道,則創建對應管道,有,則直接使用
put() : 向管道中放任務
消費者中常用的方法步驟:
1. watch():監聽管道
2. reserve():將管道中處于ready狀態的任務讀取出來
3.1 可以使用delete 方法刪除任務
3.2 可以使用release 方法將任務放回ready狀態
3.3 可以使用bury 方法將任務先放一邊(例如發郵件,郵箱服務器掛掉),等待條件成熟再取出來
6. 實際應用演示
producer.php
<?php require "vendor/autoload.php"; use Pheanstalk\Pheanstalk; $ph = new Pheanstalk('127.0.0.1',11301); $ph->useTube('List')->put('goods'); $ph->useTube('List')->put('goods2'); $ph->useTube('List')->put('goods3'); $ph->useTube('List')->put('goods4'); $ph->useTube('List')->put('goods5'); //print_r($ph->statsTube('List'));//查看List管道的信息
consumer.php
<?php require "vendor/autoload.php"; use Pheanstalk\Pheanstalk; $ph = new Pheanstalk('127.0.0.1',11301); $res = $ph->watch('List')->reserve();//監聽List管道,并將任務取出來 if ($res) {$ph->delete($res); var_dump($res); }
注:
生產者根據業務不同,將任務放到不同管道(管道用于存儲消費者生產的任務),比如將和注冊有關的任務通通放到注冊管道,和訂單有關的通通放入訂單管道
消費者將處于ready狀態的任務根據優先級逐個讀取出來
五? 使用Beanstalkd 實現類似redis秒殺活動
producer.php代碼:
<?php require "vendor/autoload.php"; use Pheanstalk\Pheanstalk; //連接beanstalkd $ph = new Pheanstalk('127.0.0.1', 11301); $tube_name = 'SecKill2'; //使用SecKill2管道 $SEC = $ph->useTube($tube_name); //模擬100人請求秒殺 for ($i = 0; $i < 100; $i++) {$uid = rand(10000000, 99999999); //獲取當前隊列已經擁有的數量,如果人數少于十,則加入這個隊列 $total_jobs = $ph->statsTube($tube_name)['total-jobs']; $num = 10; if ($total_jobs < $num) {$SEC->put($uid);//向管道放任務 echo $uid . "秒殺成功"; } else {//如果當前隊列人數已經達到10人,則返回秒殺已完成 echo "秒殺已結束<br>"; } } print_r($ph->statsTube($tube_name));//查看SecKill2管道的信息注: 執行完producer.php代碼后,應當會在SecKill2管道中看到10個任務,每一個任務內容是一個uid
consumer.php代碼:
<?php require "vendor/autoload.php"; use Pheanstalk\Pheanstalk; //連接BeanStalkd隊列系統 $ph = new Pheanstalk('127.0.0.1',11301); $tube_name = 'SecKill2'; //取出SecKill2管道的任務總數 $total_jobs = $ph->statsTube($tube_name)['total-jobs']; //PDO連接mysql數據庫 $dsn = "mysql:dbname=test;host=127.0.0.1"; $pdo = new PDO($dsn, 'root', '123456'); //循環取出管道中任務,并執行插入數據庫操作 for($i = 0; $i < $total_jobs; $i++){//監聽SecKill4管道,并將任務取出來 $job = $ph->watch($tube_name)->reserve(); //取出任務存儲的值uid $uid = $job->getData();//打印出的樣子 string(8) "24541944" if (!$uid) {sleep(2); continue; }//生成訂單號 $orderNum = build_order_no($uid); //生成訂單時間 $timeStamp = time(); //構造插入數組 $user_data = array('uid'=>$uid,'time_stamp'=>$timeStamp,'order_num'=>$orderNum); //將數據保存到數據庫 $sql = "insert into seckill (uid,time_stamp,order_num) values (:uid,:time_stamp,:order_num)"; $stmt = $pdo->prepare($sql); $res = $stmt->execute($user_data); //如果數據庫操作成功,則刪除任務 if ($res) {$ph->delete($job); }}//生成唯一訂單號 function build_order_no($uid){return substr(implode(NULL, array_map('ord', str_split(substr(uniqid(), 7, 13), 1))), 0, 8).$uid; }注: 執行完consumer.php文件之后, 數據表應該已經寫入了10個訂單
注:?
用到的數據表
create table `seckill`( `uid` int unsigned not null default '0', `time_stamp` int unsigned not null default '0', `order_num` bigint unsigned not null default '0', primary key (`uid`), key (order_num) )engine = myisam default charset= utf8;
參考視頻地址:?https://www.imooc.com/video/16016
這邊博客可能會用到?https://blog.csdn.net/ahjxhy2010/article/details/53196450
總結
以上是生活随笔為你收集整理的Beanstalked的初步了解和使用(包括利用beanstalkd 秒杀消息队列的实现)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Windows 下 Redis 的下载和
- 下一篇: 将域名绑定到ip上,并实现访问不同二级子