本文小編為大家詳細介紹“thinkphp6使用think-queue怎么實現(xiàn)普通隊列和延遲隊列”,內(nèi)容詳細,步驟清晰,細節(jié)處理妥當,希望這篇“thinkphp6使用think-queue怎么實現(xiàn)普通隊列和延遲隊列”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學(xué)習(xí)新知識吧。
網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)建站!專注于網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、微信小程序定制開發(fā)、集團企業(yè)網(wǎng)站建設(shè)等服務(wù)項目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了納雍免費建站歡迎大家使用!
###TP6 隊列
TP6 中使用 think-queue 可以實現(xiàn)普通隊列和延遲隊列。
think-queue 是thinkphp 官方提供的一個消息隊列服務(wù),它支持消息隊列的一些基本特性:
消息的發(fā)布,獲取,執(zhí)行,刪除,重發(fā),失敗處理,延遲執(zhí)行,超時控制等
隊列的多隊列, 內(nèi)存限制 ,啟動,停止,守護等
消息隊列可降級為同步執(zhí)行
1、通過生產(chǎn)者推送消息到消息隊列服務(wù)中
2、消息隊列服務(wù)將收到的消息存入redis隊列中(zset)
3、消費者進行監(jiān)聽隊列,當監(jiān)聽到隊列有新的消息時,獲取隊列第一條
4、處理獲取下來的消息調(diào)用業(yè)務(wù)類進行處理相關(guān)業(yè)務(wù)
5、業(yè)務(wù)處理后,需要從隊列中刪除消息
composer require topthink/think-queue
安裝完 think-queue 后會在 config 目錄中生成 queue.php,這個文件是隊列的配置文件。
tp6中提供了多種消息隊列的實現(xiàn)方式,默認使用sync,我這里選擇使用Redis。
return [ 'default' => 'redis', 'connections' => [ 'sync' => [ 'type' => 'sync', ], 'database' => [ 'type' => 'database', 'queue' => 'default', 'table' => 'jobs', 'connection' => null, ], 'redis' => [ 'type' => 'redis', 'queue' => 'default', 'host' => env('redis.host', '127.0.0.1'), 'port' => env('redis.port', '6379'), 'password' => env('redis.password','123456'), 'select' => 0, 'timeout' => 0, 'persistent' => false, ], ], 'failed' => [ 'type' => 'none', 'table' => 'failed_jobs', ], ];
在 app 目錄下創(chuàng)建 queue 目錄,然后在該目錄下新建一個抽象類 Queue.php 文件,作為基礎(chǔ)類
<?phpnamespace app\queue;use think\facade\Cache;use think\queue\Job;use think\facade\Log;/** * Class Queue 隊列消費基礎(chǔ)類 * @package app\queue */abstract class Queue{ /** * @describe:fire是消息隊列默認調(diào)用的方法 * @param \think\queue\Job $job * @param $message */ public function fire(Job $job, $data) { if (empty($data)) { Log::error(sprintf('[%s][%s] 隊列無消息', __CLASS__, __FUNCTION__)); return ; } $jobId = $job->getJobId(); // 隊列的數(shù)據(jù)庫id或者redis key // $jobClassName = $job->getName(); // 隊列對象類 // $queueName = $job->getQueue(); // 隊列名稱 // 如果已經(jīng)執(zhí)行中或者執(zhí)行完成就不再執(zhí)行了 if (!$this->checkJob($jobId, $data)) { $job->delete(); Cache::store('redis')->delete($jobId); return ; } // 執(zhí)行業(yè)務(wù)處理 if ($this->execute($data)) { Log::record(sprintf('[%s][%s] 隊列執(zhí)行成功', __CLASS__, __FUNCTION__)); $job->delete(); // 任務(wù)執(zhí)行成功后刪除 Cache::store('redis')->delete($jobId); // 刪除redis中的緩存 } else { // 檢查任務(wù)重試次數(shù) if ($job->attempts() > 3) { Log::error(sprintf('[%s][%s] 隊列執(zhí)行重試次數(shù)超過3次,執(zhí)行失敗', __CLASS__, __FUNCTION__)); // 第1種處理方式:重新發(fā)布任務(wù),該任務(wù)延遲10秒后再執(zhí)行;也可以不指定秒數(shù)立即執(zhí)行 //$job->release(10); // 第2種處理方式:原任務(wù)的基礎(chǔ)上1分鐘執(zhí)行一次并增加嘗試次數(shù) //$job->failed(); // 第3種處理方式:刪除任務(wù) $job->delete(); // 任務(wù)執(zhí)行后刪除 Cache::store('redis')->delete($jobId); // 刪除redis中的緩存 } } } /** * 消息在到達消費者時可能已經(jīng)不需要執(zhí)行了 * @param string $jobId * @param $message * @return bool 任務(wù)執(zhí)行的結(jié)果 * @throws \Psr\SimpleCache\InvalidArgumentException */ protected function checkJob(string $jobId, $message): bool { // 查詢redis $data = Cache::store('redis')->get($jobId); if (!empty($data)) { return false; } Cache::store('redis')->set($jobId, $message); return true; } /** * @describe: 根據(jù)消息中的數(shù)據(jù)進行實際的業(yè)務(wù)處理 * @param $data 數(shù)據(jù) * @return bool 返回結(jié)果 */ abstract protected function execute($data): bool;}
所有真正的消費類繼承基礎(chǔ)抽象類
<?phpnamespace app\queue\test;use app\queue\Queue;class Test extends Queue{ protected function execute($data): bool { // 具體消費業(yè)務(wù)邏輯 }}
use think\facade\Queue; // 普通隊列生成調(diào)用方式 Queue::push($job, $data, $queueName); // 例: Queue::push(Test::class, $data, $queueName); // 延時隊列生成調(diào)用方式 Queue::later($delay, $job, $data, $queueName); // 例如使用延時隊列 10 秒后執(zhí)行: Queue::later(10 , Test::class, $data, $queueName);
php think queue:listen php think queue:work
queue:work 命令
work 命令: 該命令將啟動一個 work 進程來處理消息隊列。
php think queue:work --queue TestQueue
queue:listen 命令
listen 命令: 該命令將會創(chuàng)建一個 listen 父進程 ,然后由父進程通過 proc_open(‘php think queue:work’)
的方式來創(chuàng)建一個work 子 進程來處理消息隊列,且限制該work進程的執(zhí)行時間。
php think queue:listen --queue TestQueue
Work 模式
php think queue:work \ --daemon //是否循環(huán)執(zhí)行,如果不加該參數(shù),則該命令處理完下一個消息就退出 --queue helloJobQueue //要處理的隊列的名稱 --delay 0 \ //如果本次任務(wù)執(zhí)行拋出異常且任務(wù)未被刪除時,設(shè)置其下次執(zhí)行前延遲多少秒,默認為0 --force \ //系統(tǒng)處于維護狀態(tài)時是否仍然處理任務(wù),并未找到相關(guān)說明 --memory 128 \ //該進程允許使用的內(nèi)存上限,以 M 為單位 --sleep 3 \ //如果隊列中無任務(wù),則sleep多少秒后重新檢查(work+daemon模式)或者退出(listen或非daemon模式) --tries 2 //如果任務(wù)已經(jīng)超過嘗試次數(shù)上限,則觸發(fā)‘任務(wù)嘗試次數(shù)超限’事件,默認為0
Listen 模式
php think queue:listen \ --queue helloJobQueue \ //監(jiān)聽的隊列的名稱 --delay 0 \ //如果本次任務(wù)執(zhí)行拋出異常且任務(wù)未被刪除時,設(shè)置其下次執(zhí)行前延遲多少秒,默認為0 --memory 128 \ //該進程允許使用的內(nèi)存上限,以 M 為單位 --sleep 3 \ //如果隊列中無任務(wù),則多長時間后重新檢查,daemon模式下有效 --tries 0 \ //如果任務(wù)已經(jīng)超過重發(fā)次數(shù)上限,則進入失敗處理邏輯,默認為0 --timeout 60 //創(chuàng)建的work子進程的允許執(zhí)行的最長時間,以秒為單位
可以看到 listen 模式下,不包含 --deamon
參數(shù),原因下面會說明
消息隊列的開始,停止與重啟
開始一個消息隊列:
php think queue:work
停止所有的消息隊列:
php think queue:restart
重啟所有的消息隊列:
php think queue:restart php think queue:work
讀到這里,這篇“thinkphp6使用think-queue怎么實現(xiàn)普通隊列和延遲隊列”文章已經(jīng)介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領(lǐng)會,如果想了解更多相關(guān)內(nèi)容的文章,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。
名稱欄目:thinkphp6使用think-queue怎么實現(xiàn)普通隊列和延遲隊列
轉(zhuǎn)載來于:http://muchs.cn/article10/gppcgo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站策劃、電子商務(wù)、品牌網(wǎng)站制作、移動網(wǎng)站建設(shè)、面包屑導(dǎo)航、小程序開發(fā)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)