playcatqueue的升級版之playcat/queue-webman

1.3
版本
2025-06-11
版本更新時(shí)間
66
安裝
3
star
簡介
之所以又重新做了一版是因?yàn)橹暗陌姹具€有一些功能沒實(shí)現(xiàn),但我又想能支持webman和tp+swoole,索性重構(gòu)了代碼。
- 支持Redis單機(jī)或集群 (redis >= 5.0)
- 支持Kafka
- 支持RabbitMQ
- 自定義異常與重試流程
- (新)支持延遲消息數(shù)據(jù)持久化
安裝
composer require "playcat/queue-webman"
1.配置
1.1
編輯 config\plugin\playcat\queue\ 目錄下的app.php和process.php。修改相應(yīng)內(nèi)容為自己環(huán)境使用的配置。
1.2 初始化數(shù)據(jù)庫(只需一次)
php webman timerserver:initdb
2.創(chuàng)建消費(fèi)任務(wù)
新建一個php的文件并且添加以下內(nèi)容:
<?php
namespace app\queue\playcat;
use Playcat\Queue\Protocols\ConsumerDataInterface;
use Playcat\Queue\Protocols\ConsumerInterface;
class playcatConsumer1 implements ConsumerInterface
{
//任務(wù)名稱,對應(yīng)發(fā)布消息的名稱
public $queue = 'playcatqueue';
public function consume(ConsumerData $payload)
{
//獲取發(fā)布到隊(duì)列時(shí)傳入的內(nèi)容
$data = $payload->getQueueData();
...你自己的業(yè)務(wù)邏輯
//休息10s
sleep(10);
echo('done!');
}
}
ConsumerData方法
- getID: 當(dāng)前消息的id
- getRetryCount(): 當(dāng)前任務(wù)已經(jīng)重試過的次數(shù)
- getQueueData(): 當(dāng)前任務(wù)傳入的參數(shù)
- getChannel(): 當(dāng)前所執(zhí)行的任務(wù)名稱
-
- -
將上面編寫好的任務(wù)文件保存項(xiàng)目中'app/queue/playcat/'下(如果目錄不存在則自己手動創(chuàng)建)
啟動服務(wù):
啟動:
php start.php start
重載:可在不重啟服務(wù)的情況下更新業(yè)務(wù)
php start.php reload
停止:
php start.php stop
如果沒有錯誤出現(xiàn)則表示啟動完成
添加任務(wù)并且提交到隊(duì)列中
use Playcat\Queue\Manager;
use Playcat\Queue\Protocols\ProducerData;
//使用協(xié)程的方式,如果需要并行數(shù)據(jù)發(fā)布需要自行實(shí)現(xiàn)Manager的連接池
//即時(shí)消費(fèi)消息
$payload = new ProducerData();
//對應(yīng)消費(fèi)隊(duì)列里的任務(wù)名稱
$payload->setChannel('test');
//對應(yīng)消費(fèi)隊(duì)列里的任務(wù)使用的數(shù)據(jù)
$payload->setQueueData([1,2,3,4]);
//推入隊(duì)列并且獲取消息id
$id = Manager::getInstance()->push($payload);
//延遲消費(fèi)消息
$payload_delay = new ProducerData();
$payload_delay->setChannel('test');
$payload_delay->setQueueData([6,7,8,9]);
//設(shè)置60秒后執(zhí)行的任務(wù)
$payload_delay->setDelayTime(60);
//推入隊(duì)列并且獲取消息id
$id = Manager::getInstance()->push($payload_delay);
//取消延遲消息
Manager::getInstance()->del($id);
ProducerData方法
- setChannel: 設(shè)置推入消息的隊(duì)列名稱
- setQueueData: 設(shè)置傳入到消費(fèi)任務(wù)的數(shù)據(jù)
- setDelayTime: 設(shè)置延遲時(shí)間(秒)
-
- -
異常與重試機(jī)制
任務(wù)在執(zhí)行過程中未拋出異常則默認(rèn)執(zhí)行成功,否則則進(jìn)入重試階段.
重試次數(shù)和時(shí)間由配置控制,重試間隔時(shí)間為當(dāng)前重試次數(shù)的冪函數(shù)。
Playcat\Queue\Exceptions\DontRetry異常會忽略掉重試
其它
基于tp和swoole的隊(duì)列系統(tǒng)
playcat-queue-tpswoole
QQ:318274085