[solarseahorse] redis消息隊(duì)列插件

Webman Redis Queue 插件
簡介
webman-redis-queue
是為 Webman 框架設(shè)計(jì)的高效、靈活的 Redis
隊(duì)列插件。利用 Redis Stream 的強(qiáng)大特性,該插件專注于提供可靠和高性能的消息隊(duì)列解決方案,適合處理大規(guī)模的數(shù)據(jù)流和復(fù)雜的隊(duì)列操作。
主要特性
- 基于 Redis Stream: 使用 Redis 最新的 Stream 數(shù)據(jù)類型,為消息隊(duì)列和事件流提供優(yōu)化的存儲和訪問。
- 自定義異常重試: 支持自定義的消息處理失敗重試機(jī)制,提高消息處理的可靠性。
- 死信隊(duì)列處理: 集成死信隊(duì)列管理,確保消息不會(huì)因處理失敗而丟失。
- 延時(shí)隊(duì)列支持: 實(shí)現(xiàn)延時(shí)消息處理,使得定時(shí)任務(wù)和延遲執(zhí)行變得簡單易行。
- 高效的異常處理機(jī)制: 強(qiáng)化的異常處理策略,確保隊(duì)列的穩(wěn)定運(yùn)行。
安裝
通過 Composer 安裝 webman-redis-queue
:
composer require solarseahorse/webman-redis-queue:^1.0.1
版本變更記錄
v1.0.1 (20240128)
新增功能
-
刪除延時(shí)消息:
新增removeDelayedMessage
方法,允許移除一條延時(shí)消息。 -
批量刪除延時(shí)消息:
新增removeDelayedMessages
方法,允許一次性移除多個(gè)指定的延時(shí)消息。 -
檢查延時(shí)消息存在性:
新增hasDelayedMessageExists
方法,用于檢查延時(shí)消息是否存在。 -
批量檢查延時(shí)消息存在性:
新增hasDelayedMessagesExist
方法,用于批量檢查多個(gè)延時(shí)消息是否存在。
異常處理
- 引入新的異常類型:
為延時(shí)消息的移除和存在性檢查操作引入了DelayedMessageRemoveException
和DelayedMessageCheckException
異常類型。
文檔修正
- 修正文檔中的幾處錯(cuò)誤:
對插件的官方文檔進(jìn)行了更新,修正了之前版本中存在的一些描述不準(zhǔn)確和排版錯(cuò)誤。
測試和反饋
我們非常歡迎并鼓勵(lì)您在測試環(huán)境中嘗試這個(gè)插件,并且分享您的使用體驗(yàn)。您的反饋對我們改進(jìn)插件、修復(fù)潛在的問題以及發(fā)布未來的穩(wěn)定版本非常重要。如果您在使用過程中遇到任何問題或有任何建議,請通過 GitHub Issues
與我聯(lián)系。
參與貢獻(xiàn)
如果您對改進(jìn) webman-redis-queue 有興趣,歡迎任何形式的貢獻(xiàn),包括但不限于:提交問題、提供反饋、或直接向代碼庫提交改進(jìn)。您的貢獻(xiàn)將幫助我們更快地推出穩(wěn)定、功能豐富的正式版本。
配置
配置文件自動(dòng)生成在 config/plugin/solarseahorse/webman-redis-queue目錄下。
1. Redis配置 redis.php
<?php
return [
'default' => [
'host' => 'redis://127.0.0.1:6379',
'options' => [
'auth' => null, // 密碼,字符串類型,可選參數(shù)
'db' => 0, // 數(shù)據(jù)庫
'prefix' => 'webman_redis_queue_', // key 前綴
'timeout' => 2, // Timeout
'ping' => 55, // Ping
'reconnect' => true, // 斷線重連
'max_retries' => 5, // 最大重連次數(shù)
'retry_interval' => 5 , // 重連間隔 s
]
],
];
在webman集群下,每個(gè)節(jié)點(diǎn)需要連接同一個(gè)redis。
斷線重連
注意:開啟此選項(xiàng)能增加隊(duì)列運(yùn)行穩(wěn)定性,但如果隊(duì)列進(jìn)程過多,redis恢復(fù)后可能造成突發(fā)大量連接數(shù),因?yàn)槊總€(gè)進(jìn)程都有一個(gè)redis連接。
默認(rèn)開啟,當(dāng)Redis發(fā)生重載
,重啟
等情況會(huì)嘗試重連,超過最大重試次數(shù)后會(huì)報(bào)錯(cuò)并重啟進(jìn)程(webman默認(rèn)行為)。
2. 日志配置 log.php
推薦為插件配置單獨(dú)日志通道,參考鏈接 webman日志
<?php
return [
'enable' => true, // 啟用日志
'handlers' => support\Log::channel('default') // 默認(rèn)通道 default
];
在隊(duì)列消費(fèi)業(yè)務(wù)邏輯中可以這樣使用日志,使用方法和官方的Log
類使用方法一致。
LogUtility::warning('Error:', [
'data' => $consumerMessage->getData(),
'errorMessage' => $e->getMessage()
]);
4. 隊(duì)列配置 process.php
- 在加載類名模式下,每個(gè)隊(duì)列都擁有獨(dú)立的運(yùn)行進(jìn)程。
- 每個(gè)隊(duì)列的配置和數(shù)據(jù)存儲KEY都是獨(dú)立的。
- 不推薦目錄模式是因?yàn)槎鄠€(gè)隊(duì)列共享進(jìn)程,其中某個(gè)隊(duì)列出現(xiàn)異常可能影響到其他隊(duì)列。
- 隊(duì)列的詳細(xì)配置都在消費(fèi)類中配置,配置文件只是基本的進(jìn)程配置。
<?php
return [
'send-email' => [
'handler' => SolarSeahorse\WebmanRedisQueue\Process\ConsumerProcess::class,
'count' => 20, // 在目錄模式中,目錄下所有隊(duì)列是共用進(jìn)程
'constructor' => [
// 支持目錄和類 推薦使用類名
'consumer_source' => \App\queue\test\Email::class
]
]
];
定義消費(fèi)類
插件對消費(fèi)類對位置沒有固定要求,符合加載規(guī)范即可。
教程以app/queue/SendEmail.php
舉例,目錄和文件需自行創(chuàng)建。
繼承 SolarSeahorse\WebmanRedisQueue\Consumer
,配置連接標(biāo)識,并實(shí)現(xiàn)抽象方法consume
, 一個(gè)最基礎(chǔ)的消費(fèi)類就創(chuàng)建好了。
<?php
namespace app\queue\test;
use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
class SendEmail extends Consumer
{
// 連接標(biāo)識,對應(yīng)config/plugin/solarseahorse/webman-redis-queue/redis.php的配置
protected string $connection = 'default';
public function consume(ConsumerMessageInterface $consumerMessage)
{
// TODO: Implement consume() method.
}
}
編寫完成后需要在隊(duì)列配置文件
process.php
中新增隊(duì)列配置。
通過命令行創(chuàng)建
通過 php webman solar:make:consumer
命令可快速創(chuàng)建一個(gè)消費(fèi)類。
示例操作:
webman % php webman solar:make:consumer
Please enter the name of the queue: sendCode
Please enter the number of processes (default 1): 1
Please enter the path to create the class in [app/queue]: app/queue/test
最終將會(huì)在 app/queue/test
目錄中創(chuàng)建 SendCode.php
文件。
<?php
namespace app\queue\test;
use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
class SendCode extends Consumer
{
// 連接標(biāo)識,對應(yīng)config/plugin/solarseahorse/webman-redis-queue/redis.php的配置
protected string $connection = 'default';
// 消費(fèi)
public function consume(ConsumerMessageInterface $consumerMessage)
{
// TODO: Implement consume() method.
// 獲取消息ID
$messageId = $consumerMessage->getMessageId();
// 獲取隊(duì)列數(shù)據(jù)
$data = $consumerMessage->getData();
var_dump($messageId);
}
}
隊(duì)列配置文件process.php
也會(huì)自動(dòng)更新。
<?php
return array (
'sendCode' =>
array (
'handler' => 'SolarSeahorse\\WebmanRedisQueue\\Process\\ConsumerProcess',
'count' => 1,
'constructor' =>
array (
'consumer_source' => 'app\\queue\\test\\SendCode',
),
),
);
配置屬性
protected string $connection = 'default';
- 連接標(biāo)識,用于指定 Redis 連接配置。
protected string $queueName = '';
- 隊(duì)列名稱,默認(rèn)自動(dòng)生成。
protected string $groupName = '';
- 隊(duì)列分組名,默認(rèn)自動(dòng)生成。
protected string $streamKey = '';
- Stream key,默認(rèn)自動(dòng)生成。
protected int $prefetchCount = 1;
- 返回消息的最大數(shù)量。默認(rèn)為1 不建議修改
- 消費(fèi)速度可通過提高進(jìn)程數(shù)并行處理消息,消費(fèi)者每次讀取多條數(shù)據(jù)是循環(huán)消費(fèi),極端情況如循環(huán)消費(fèi)一半進(jìn)程重啟會(huì)造成大量消息掛起。
protected int $blockTime = 5000;
- 當(dāng)無消息時(shí)堵塞等待的毫秒數(shù),也可作為無消息時(shí)的休眠時(shí)長。如果隊(duì)列以延時(shí)隊(duì)列為主,應(yīng)與延時(shí)隊(duì)列間隔相近。
protected float $consumerTimerInterval = 0.5;
- 消費(fèi)者處理間隔,消費(fèi)完一條消息后的等待時(shí)間(秒)。
protected int $maxAttempts = 5;
- 消費(fèi)失敗后的最大重試次數(shù)。
protected int $retrySeconds = 60;
- 重試間隔(秒)。
protected bool $autoAck = true;
- 是否自動(dòng)確認(rèn)消息。開啟的同時(shí)同樣建議在業(yè)務(wù)邏輯中顯式調(diào)用
ack
方法。
protected bool $autoDel = true;
- 是否自動(dòng)刪除已確認(rèn)成功的消息。
protected int $delayedQueueOnceHandlerCount = 128;
- 延時(shí)隊(duì)列每次處理數(shù)量,根據(jù)生產(chǎn)速率適當(dāng)配置。
protected int $delayedMessagesTimerInterval = 1;
- 延時(shí)消息處理間隔(秒)。
protected int $delayedMessagesMaxWorkerCount = 1;
- 延時(shí)隊(duì)列最大進(jìn)程數(shù),默認(rèn)單線程,只會(huì)在一個(gè)進(jìn)程開啟延時(shí)隊(duì)列處理。
protected string $delayedTaskSetKey = '';
- 延時(shí)隊(duì)列 SET KEY,默認(rèn)自動(dòng)生成。
protected string $delayedDataHashKey = '';
- 延時(shí)隊(duì)列 HASH KEY,默認(rèn)自動(dòng)生成。
protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_RETRY;
- 消息掛起超時(shí)處理策略。
PENDING_PROCESSING_RETRY
或PENDING_PROCESSING_IGNORE
。 PENDING_PROCESSING_RETRY
當(dāng)消息掛起超時(shí)會(huì)進(jìn)行異常重試。PENDING_PROCESSING_IGNORE
當(dāng)消息掛起超時(shí)時(shí),觸發(fā)死信處理
方便排查錯(cuò)誤,除此之外只清理pending
列表,不做其他處理。- 默認(rèn)
PENDING_PROCESSING_RETRY
, 根據(jù)隊(duì)列場景選擇合適的處理策略,比如發(fā)送短信驗(yàn)證碼
,當(dāng)系統(tǒng)出現(xiàn)了崩潰等情況,恢復(fù)上線時(shí),一般情況下這類消息時(shí)不需要恢復(fù),此時(shí)重新給用戶發(fā)送驗(yàn)證碼沒有意義,但因?yàn)?code>Redis Stream特性,未ack的消息會(huì)在pending
列表中不會(huì)丟失,這類場景就適合配置PENDING_PROCESSING_IGNORE
protected int $pendingTimout = 300;
- 消息掛起超時(shí)時(shí)間(秒)。
- 在Redis Stream中當(dāng)消息被消費(fèi)者讀取,但沒有確認(rèn)(ACK)時(shí),消息會(huì)處于掛起狀態(tài)進(jìn)入
pending
列表。 - 如果消息處理緩慢,此值應(yīng)盡可能調(diào)大,避免將正常處理的消息當(dāng)成超時(shí)處理掉。
protected int $checkPendingTimerInterval = 60;
- 檢查 pending 列表的間隔時(shí)間(秒)。
protected int $onceCheckPendingCount = 50;
- 每次檢查 pending 列表的消息數(shù)量。
投遞消息
通過pushMessage
方法可快速向隊(duì)列投遞一條消息。
/**
* @param mixed|QueueMessageInterface $data
* @return string|bool
* @throws QueueMessagePushException
*/
public function pushMessage(string|array|int|QueueMessageInterface $data): string|bool;
// 消息內(nèi)容,無需序列化
$message = [
'dummy' => 'ok'
];
// 生產(chǎn)者工廠方法
$messageId = QueueProducerFactory::create(app\queue\test\SendEmail::class)
->pushMessage($message);
// 通過消費(fèi)類工廠方法 創(chuàng)建一個(gè)生產(chǎn)者
$messageId = app\queue\test\SendEmail::createQueueProducer()->pushMessage($message);
// 投遞QueueMessage對象
$message = app\queue\test\SendEmail::createQueueMessage($message);
// 或者通過QueueMessageFactory創(chuàng)建一條消息
$message = QueueMessageFactory::create(app\queue\test\SendEmail::class,$message);
// 修改隊(duì)列數(shù)據(jù)
$message->setData(['dummy' => 'no']);
// 設(shè)置錯(cuò)誤次數(shù)
$message->setFailCount(3);
// 通過上方兩種方法投遞均可
$messageId = app\queue\test\SendEmail::createQueueProducer()->pushMessage($message);
var_export($messageId); // 返回stream的字符串ID 或 false
有時(shí)候我們需要一次投遞大量隊(duì)列時(shí),可以通過pushMessages
方法,批量投遞消息,此方法會(huì)開啟Redis
的pipeline
管道投遞,提高與redis
的交互性能。
/**
* @param array|QueueMessageInterface[] $dataArray
* @return array|false
* @throws QueueMessagePushException
*/
public function pushMessages(array $dataArray): array|bool;
// 投遞5w條消息
$dataArr = array_fill(0, 50000, null);
for ($i = 0; $i < 50000; $i++) {
$dataArr[$i] = ['dummy' => uniqid()];
}
$messageIds = app\queue\test\SendEmail::createQueueProducer()->pushMessages($dataArr);
// QueueMessage方式
for ($i = 0; $i < 50000; $i++) {
$message = QueueMessageFactory::create(app\queue\test\SendEmail::class, ['dummy' => uniqid()]);
//$message->setData(json_encode(['123']));
//$message->setFailCount(1);
// ....
$dataArr[$i] = $message;
}
$messageIds = app\queue\test\SendEmail::createQueueProducer()->pushMessages($dataArr);
var_export($messageIds); // 返回Stream消息ID列表 或 false
數(shù)組投遞實(shí)際是通過數(shù)組創(chuàng)建一個(gè)
QueueMessage
對象
延時(shí)消息
延時(shí)消息的作用:
-
定時(shí)任務(wù):
延時(shí)消息可以用來實(shí)現(xiàn)定時(shí)任務(wù)。例如,你可能想在未來的某個(gè)時(shí)間點(diǎn)執(zhí)行特定操作,如發(fā)送提醒、更新狀態(tài)等。 -
延遲處理:
在某些情況下,立即處理消息并不理想或可能。延時(shí)消息允許應(yīng)用程序延遲處理,直到最合適的時(shí)機(jī)。 -
限流:
延時(shí)消息可以幫助對系統(tǒng)內(nèi)部的請求進(jìn)行限流,防止在短時(shí)間內(nèi)因大量請求而過載。 -
解耦和異步處理:
在復(fù)雜的系統(tǒng)中,延時(shí)消息可以用來解耦不同組件間的直接交互,提高系統(tǒng)的可擴(kuò)展性和維護(hù)性。
通過 scheduleDelayedMessage
方法快速投遞一條延時(shí)消息。
/**
* @param mixed|QueueMessageInterface $data
* @param int $delay
* @param string $identifier
* @return bool
* @throws ScheduleDelayedMessageException
*/
public function scheduleDelayedMessage(string|array|int|QueueMessageInterface $data, int $delay = 0, string $identifier = ''): bool;
// 消息內(nèi)容
$message = [
'type' => 'warning',
'to' => 'xxxx@email.com',
'content' => '.....'
];
// 投遞一條延時(shí)消息 60秒后處理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message, 60);
// QueueMessage對象
$message = app\queue\test\SendEmail::createQueueMessage($message);
// 設(shè)置延時(shí)
$message->setDelay(60);
// 投遞一條延時(shí)消息 60秒后處理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message);
// 使用第二個(gè)參數(shù)會(huì)替換之前對象的延時(shí)設(shè)置
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message,80);
如果我們想避免消息被重復(fù)發(fā)送等情況,通過延時(shí)隊(duì)列的特性可以很簡單實(shí)現(xiàn)。通過scheduleDelayedMessage
方法的第三個(gè)參數(shù)identifier
傳遞一個(gè)自定義的延時(shí)消息ID,同樣的消息ID,消息將會(huì)被替換,延時(shí)時(shí)間從修改開始重新計(jì)算。
如果消息已經(jīng)進(jìn)入stream隊(duì)列將無法實(shí)現(xiàn)替換,必須在延時(shí)時(shí)間內(nèi),類似實(shí)現(xiàn)一個(gè)“防抖”效果,消息在時(shí)間段內(nèi)發(fā)送多次最終只處理一次。
// 消息內(nèi)容
$message = [
'type' => 'warning',
'to' => 'xxxx@email.com',
'content' => '.....'
];
// 通過type,to參數(shù)生成一個(gè)唯一ID
$identifier = md5(serialize([
'type' => 'warning',
'to' => 'xxxx@email.com',
]));
// 投遞一條延時(shí)消息 60秒后處理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message, 60, $identifier);
// QueueMessage對象
$message = app\queue\test\SendEmail::createQueueMessage($message);
// 設(shè)置延時(shí)
$message->setDelay(60);
// 設(shè)置identifier
$message->setIdentifier($identifier);
// 投遞一條延時(shí)消息 60秒后處理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message);
// 傳遞參數(shù)會(huì)替換對象之前的延時(shí)和ID設(shè)置
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message, 80, $identifier);
當(dāng)一次需要投遞大量延時(shí)消息時(shí),可以通過scheduleDelayedMessages
方法發(fā)送。
// 投遞10w條延時(shí)消息
$dataArr = array_fill(0, 100000, null);
for ($i = 0; $i < 100000; $i++) {
$dataArr[$i] = [
'delay' => 2, // 延時(shí)時(shí)間
'data' => ['dummy' => uniqid()], // 隊(duì)列數(shù)據(jù)
'identifier' => '' // 自定義ID
];
}
// 批量投遞
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessages($dataArr);
// QueueMessage對象
for ($i = 0; $i < 100000; $i++) {
$message = app\queue\test\SendEmail::createQueueMessage(['dummy' => uniqid()]);
// 設(shè)置延時(shí)
$message->setDelay(60);
// 設(shè)置identifier
$message->setIdentifier('');
$dataArr[$i] = $message;
}
// 批量投遞
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessages($dataArr);
多redis只需要在隊(duì)列配置
connection
連接標(biāo)識,投遞方式?jīng)]有任何變化。
移除延時(shí)隊(duì)列消息
新功能 (v1.0.1)
以下功能在插件的
v1.0.1
版本中新增。
有時(shí)候我們想移除某個(gè)或多個(gè)延時(shí)隊(duì)列時(shí),可以使用removeDelayedMessage
和removeDelayedMessages
方法實(shí)現(xiàn),使用hasDelayedMessageExists
和hasDelayedMessagesExist
判斷一條或多條延時(shí)消息是否存在。
只有任務(wù)還存在延時(shí)隊(duì)列中才能移除,如果已經(jīng)進(jìn)入
Stream
隊(duì)列中將無法移除。
/**
* @param string $identifier
* @return bool
*/
public function removeDelayedMessage(string $identifier): bool;
/**
* @param array $identifiers
* @return bool|array
*/
public function removeDelayedMessages(array $identifiers): array|bool;
/**
* @param string $identifier
* @return bool
*/
public function hasDelayedMessageExists(string $identifier): bool;
/**
* @param array $identifiers
* @return bool|array
*/
public function hasDelayedMessagesExist(array $identifiers): array|bool;
代碼示例:
$consumer = new app\queue\test\SendEmail();
$queueProducer = $consumer::createQueueProducer();
// 添加一條延時(shí)消息 通過業(yè)務(wù)數(shù)據(jù)生成消息ID
$queueProducer->scheduleDelayedMessage(['dummy' => 'ok'], 60, 'email_user_id');
// 通過QueueMessage對象
$queueMessage = $consumer::createQueueMessage(['dummy' => 'ok']);
$queueMessage->setDelay(60);
// 自定義消息ID 不設(shè)置將默認(rèn)生成 通過getIdentifier()獲取
$queueMessage->setIdentifier('test_id');
// 獲取消息ID
$id = $queueMessage->getIdentifier();
// 判斷消息是否存在
var_export(SendEmail::createQueueProducer()->hasDelayedMessageExists('identifier')); // true or false
// 移除一條延時(shí)消息
var_export(SendEmail::createQueueProducer()->removeDelayedMessage('identifier')); // true or false
// 判斷多條消息是否存在 返回一個(gè)數(shù)組
var_export(SendEmail::createQueueProducer()->hasDelayedMessagesExist(['identifier1', 'identifier1', 'identifier1']));
//.array (
// 0 => 1706383223.0,
// 1 => 1706383223.0,
// 2 => false
//).
// 移除多條延時(shí)消息 返回一個(gè)數(shù)組
var_export(SendEmail::createQueueProducer()->removeDelayedMessages(['identifier1', 'identifier1', 'identifier1']));
//
//.array (
// 0 => 1,
// 1 => 1,
// 2 => 0
//).
消費(fèi)消息
消費(fèi)消息時(shí)會(huì)調(diào)用消費(fèi)類的consume
方法,并傳遞一個(gè)實(shí)現(xiàn)ConsumerMessageInterface
接口對象。
<?php
namespace app\queue\test;
use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
class SendEmail extends Consumer
{
// 連接標(biāo)識,對應(yīng)redis.php的配置 默認(rèn)default
protected string $connection = 'default';
public function consume(ConsumerMessageInterface $consumerMessage)
{
// TODO: Implement consume() method.
// 獲取消息ID
$messageId = $consumerMessage->getMessageId();
// 獲取隊(duì)列數(shù)據(jù)
$data = $consumerMessage->getData();
// 禁用錯(cuò)誤重試 如果消費(fèi)失敗將不會(huì)異常重試
$consumerMessage->disableFailRetry();
// 手動(dòng)觸發(fā)錯(cuò)誤重試,此方法會(huì)調(diào)用disableFailRetry方法,所以后續(xù)報(bào)錯(cuò)不會(huì)再觸發(fā)異常重試。
// 沒有禁用錯(cuò)誤重試的情況下,消費(fèi)異常默認(rèn)會(huì)調(diào)用此方法。
$consumerMessage->triggerError(new \Exception('triggerError'));
// 監(jiān)聽消費(fèi)異常事件
$consumerMessage->onError(function (\Throwable $e, ConsumerMessageInterface $consumerMessage) {
// 這里可以處理消費(fèi)異常邏輯
// 禁用錯(cuò)誤重試
$consumerMessage->disableFailRetry();
// 添加日志等等
// 如果在消費(fèi)方法中自行捕獲 Throwable 此事件不會(huì)觸發(fā)
});
// 業(yè)務(wù)邏輯執(zhí)行完畢,ack確認(rèn)消息 默認(rèn)自動(dòng)ack,但通常建議在業(yè)務(wù)邏輯中顯式調(diào)用,比如ack失敗進(jìn)行事務(wù)回滾等等。
$isAcked = $consumerMessage->ack();
if (!$isAcked) {
}
// 或通過getAckStatus方法獲取結(jié)果
if (!$consumerMessage->getAckStatus()) {
}
// 獲取原始隊(duì)列消息 QueueMessage對象
$queueMessage = $consumerMessage->getQueueMessage();
// 獲取消息錯(cuò)誤次數(shù)...
$failCount = $queueMessage->getFailCount();
// 更多...
}
}
上方示例主要演示可調(diào)用的方法,下面使用一個(gè)更加貼合實(shí)際的demo,更快了解消費(fèi)業(yè)務(wù)邏輯的編寫。
發(fā)送郵件驗(yàn)證碼
場景特點(diǎn):獲取驗(yàn)證碼的操作一般由用戶手動(dòng)觸發(fā),在這類場景中,錯(cuò)誤重試應(yīng)用戶在前端UI倒計(jì)時(shí)結(jié)束后重新手動(dòng)發(fā)起,如果業(yè)務(wù)出現(xiàn)崩潰,再次上線后重新發(fā)送驗(yàn)證碼給用戶已經(jīng)沒有意義了。我們可以通過配置適應(yīng)這類場景,代碼示例:
<?php
namespace app\queue\test;
use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
class SendEmail extends Consumer
{
// 連接標(biāo)識,對應(yīng)redis.php的配置 默認(rèn)default
protected string $connection = 'default';
// 將pending處理策略調(diào)整為PENDING_PROCESSING_IGNORE 消息掛起超時(shí)將不會(huì)進(jìn)行重試
protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_IGNORE;
public function consume(ConsumerMessageInterface $consumerMessage)
{
// TODO: Implement consume() method.
// 獲取消息ID
$messageId = $consumerMessage->getMessageId();
// 獲取隊(duì)列數(shù)據(jù)
$data = $consumerMessage->getData();
// 監(jiān)聽異常
$consumerMessage->onError(function (\Throwable $e){
// 記錄郵件發(fā)送失敗日志
});
// 禁用重試
$consumerMessage->disableFailRetry();
// 發(fā)送一封郵件 ....
// 確認(rèn)消息
$consumerMessage->ack();
}
}
自定義錯(cuò)誤重試
消費(fèi)類繼承的抽象類Consumer
默認(rèn)實(shí)現(xiàn)了handlerFailRetry
方法,在觸發(fā)異常重試時(shí),會(huì)調(diào)用此方法,如果您想自定義錯(cuò)誤重試邏輯,或加入更多自定義的處理,在本插件中可以輕松實(shí)現(xiàn),并且每個(gè)隊(duì)列都支持自定義配置。
/**
* 處理錯(cuò)誤重試
* @param $messageId
* @param ConsumerMessageInterface $consumerMessage
* @param Throwable $e
* @return bool
* @throws ScheduleDelayedMessageException
* @throws RedisException
* @throws Throwable
*/
public function handlerFailRetry($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): bool
{
$queueMessage = $consumerMessage->getQueueMessage();
// 檢查是否超過最大重試次數(shù)
if ($queueMessage->getFailCount() >= $this->maxAttempts) {
// 死信處理
$this->handlerDeadLetterQueue($messageId, $consumerMessage, $e);
return true;
}
$queueMessage->incrementFailCount(); // Fail count + 1
// 計(jì)算下次重試時(shí)間
$retrySeconds = $queueMessage->getFailCount() * $this->retrySeconds;
// 更新下次重試時(shí)間
$queueMessage->updateNextRetry($retrySeconds);
// 設(shè)置消息延時(shí)
$queueMessage->setDelay($retrySeconds);
// 設(shè)置消息ID 避免重復(fù)任務(wù)
$queueMessage->setIdentifier($messageId);
// 重新發(fā)布至延時(shí)隊(duì)列
return self::createQueueProducer()->scheduleDelayedMessage($queueMessage);
}
默認(rèn)實(shí)現(xiàn)的代碼如上,我們只需要重寫此方法就可以自定義錯(cuò)誤處理的業(yè)務(wù)邏輯。
代碼示例:
<?php
namespace app\queue\test;
use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
use Throwable;
class SendEmail extends Consumer
{
// 連接標(biāo)識,對應(yīng)redis.php的配置 默認(rèn)default
protected string $connection = 'default';
// 將pending處理策略調(diào)整為PENDING_PROCESSING_IGNORE 消息掛起超時(shí)將不會(huì)進(jìn)行重試
protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_IGNORE;
public function consume(ConsumerMessageInterface $consumerMessage)
{
// TODO: Implement consume() method.
// 獲取消息ID
$messageId = $consumerMessage->getMessageId();
// 獲取隊(duì)列數(shù)據(jù)
$data = $consumerMessage->getData();
// 監(jiān)聽異常
$consumerMessage->onError(function (\Throwable $e){
// 記錄郵件發(fā)送失敗日志
});
// 禁用重試
$consumerMessage->disableFailRetry();
// 發(fā)送一封郵件 ....
// 確認(rèn)消息
$consumerMessage->ack();
}
public function handlerFailRetry($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): bool
{
// 不改動(dòng)原本的錯(cuò)誤處理 也可以完全自定義實(shí)現(xiàn)。
parent::handlerFailRetry($messageId, $consumerMessage, $e);
// 如果隊(duì)列在業(yè)務(wù)數(shù)據(jù)庫中還有一個(gè)tasks表進(jìn)行調(diào)度,在這里可以更新task數(shù)據(jù) 比如 錯(cuò)誤次數(shù)+1
}
}
自定義死信處理
在handlerFailRetry
方法中,默認(rèn)有這一段:
// 檢查是否超過最大重試次數(shù)
if ($queueMessage->getFailCount() >= $this->maxAttempts) {
// 死信處理
$this->handlerDeadLetterQueue($messageId, $consumerMessage, $e);
return true;
}
那么,我們?nèi)绻枰远x死信處理或加入額外的業(yè)務(wù)邏輯可以通過重寫handlerDeadLetterQueue
方法實(shí)現(xiàn)。
protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_IGNORE;
當(dāng)我們設(shè)置pending處理策略為PENDING_PROCESSING_IGNORE
時(shí),消息如果掛起超時(shí),將不會(huì)觸發(fā)異常重試,而是直接調(diào)用死信處理。默認(rèn)情況下,死信處理會(huì)新增一條日志,方便排查問題。
默認(rèn)情況下需要配置有效的日志(log.php) 默認(rèn)行為才有效。也可以通過重寫方法完全自行實(shí)現(xiàn),記錄在業(yè)務(wù)的數(shù)據(jù)庫中,這也是推薦的做法,可以針對業(yè)務(wù)實(shí)現(xiàn)更加靈活的異常處理。
/**
* 處理死信 超過最大重試次數(shù)或pending超時(shí)PENDING_PROCESSING_IGNORE策略 會(huì)調(diào)用此方法
* @param $messageId
* @param ConsumerMessageInterface $consumerMessage
* @param Throwable $e
* @return void
*/
public function handlerDeadLetterQueue($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): void
{
$queueMessage = $consumerMessage->getQueueMessage();
// 添加日志
LogUtility::warning('dead_letter_queue: ', [
'messageId' => $messageId,
'message' => $queueMessage->toArray(),
'failCount' => $queueMessage->getFailCount(),
'errorMsg' => $e->getMessage(),
'trace' => $e->getTraceAsString()
]);
// 更多...
}
代碼示例:
public function handlerDeadLetterQueue($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): void
{
// 保持默認(rèn)行為
parent::handlerDeadLetterQueue($messageId, $consumerMessage, $e); // TODO: Change the autogenerated stub
// 如果隊(duì)列在業(yè)務(wù)數(shù)據(jù)庫中還有一個(gè)tasks表進(jìn)行調(diào)度,在這里可以更新task數(shù)據(jù)
}
自定義pending超時(shí)處理
抽象類Consumer
中,默認(rèn)定義了handlerPendingTimeoutMessages
方法,用于處理pending超時(shí)的消息。
消費(fèi)者讀取了一條消息后,消息會(huì)進(jìn)入pending
列表,不會(huì)被當(dāng)前和其他消費(fèi)者再次讀取,當(dāng)業(yè)務(wù)邏輯沒有執(zhí)行完畢,服務(wù)出現(xiàn)掉線,崩潰時(shí),消息并沒有ack
,消息會(huì)一直保存在pending
列表中,pending
列表只能通過ack
移除,如果長期不處理,可能造成pending
列表堆積,造成大量內(nèi)存占用,當(dāng)持續(xù)時(shí)間大于$pendingTimout
屬性的時(shí)間(默認(rèn)300秒),會(huì)調(diào)用此方法進(jìn)行處理。
默認(rèn)情況下,在
PENDING_PROCESSING_IGNORE
策略中,我們認(rèn)為pending超時(shí)消息是死信,不會(huì)再次處理,PENDING_PROCESSING_RETRY
會(huì)進(jìn)行異常重試。
/**
* 處理消息掛起超時(shí) 當(dāng)pending列表中有超時(shí)未ack的消息會(huì)觸發(fā)此方法
* @param string $messageId
* @param ConsumerMessageInterface $consumerMessage
* @param string $consumerName
* @param int $elapsedTime
* @param int $deliveryCount
* @return void
* @throws RedisException
* @throws ScheduleDelayedMessageException
* @throws Throwable
*/
public function handlerPendingTimeoutMessages(string $messageId, ConsumerMessageInterface $consumerMessage, string $consumerName, int $elapsedTime, int $deliveryCount): void
{
switch ($this->getPendingProcessingStrategy()) {
case self::PENDING_PROCESSING_IGNORE: // 忽略pending超時(shí)
// 確認(rèn)消息
$consumerMessage->ack();
// 觸發(fā)死信處理
$this->handlerDeadLetterQueue($messageId, $consumerMessage, new Exception(
'PENDING_PROCESSING_IGNORE: Message pending timeout.'
));
break;
case self::PENDING_PROCESSING_RETRY: // pending超時(shí)重試
// 觸發(fā)死信處理
if ($deliveryCount + 1 > $this->getMaxAttempts()) {
// ack消息
$consumerMessage->ack();
$this->handlerDeadLetterQueue(
$messageId,
$consumerMessage,
new Exception(
'PENDING_PROCESSING_RETRY: The number of message delivery times exceeds the maximum number of retries.'
));
return;
}
// 處理重試
$handlerStatus = $this->handlerFailRetry(
$messageId,
$consumerMessage,
new Exception('PENDING_PROCESSING_RETRY: Message pending timeout retry.')
);
if ($handlerStatus) {
$consumerMessage->ack();
}
break;
}
}
獲取隊(duì)列的redis連接
有時(shí)候我們需要操作或維護(hù)隊(duì)列時(shí),可以直接獲取隊(duì)列的Redis連接進(jìn)行操作,比如編寫自定義腳本等。
// 獲取隊(duì)列的Redis連接
$sendCode = new app\queue\test\SendCode();
$redisConnection = $sendCode->getRedisConnection();
// 使用方法和phpredis擴(kuò)展一致
$redisConnection->xLen();
$redisConnection->sAdd();
// 在消費(fèi)類中可以直接使用$this->getRedisConnection();
....更多
命令行
php webman solar:make:consumer
- 創(chuàng)建一個(gè)消費(fèi)者
- 它將引導(dǎo)你創(chuàng)建一個(gè)基本的消費(fèi)者類
php webman solar:remove:consumer
- 移除一個(gè)消費(fèi)者
- 它將引導(dǎo)你移除消費(fèi)者類
- 注意:它會(huì)移除redis中關(guān)于此消費(fèi)者的所有數(shù)據(jù),如果你只是想移除類和配置,請不要使用此命令。
php webman solar:clean:redis:data
- 清理某個(gè)消費(fèi)者的Redis數(shù)據(jù)
- 它將引導(dǎo)你清理redis數(shù)據(jù)
- 注意:它將刪除redis中關(guān)于此消費(fèi)者的所有數(shù)據(jù),請謹(jǐn)慎操作。
php webman solar:consumer:list
- 獲取當(dāng)前全部消費(fèi)者信息,包含如下信息:
Key
標(biāo)識Handler
進(jìn)程類Count
進(jìn)程數(shù)Consumer
消費(fèi)者類名Stream Length
當(dāng)前隊(duì)列總長度(不包含Pending列表中的數(shù)量)Delay Set Length
當(dāng)前延時(shí)隊(duì)列任務(wù)數(shù)Pending List Length
當(dāng)前Pending列表長度
+-------------+--------------------------------------------------------+-------+--------------------------+---------------+------------------+---------------------+
| Key | Handler | Count | Consumer | Stream Length | Delay Set Length | Pending List Length |
+-------------+--------------------------------------------------------+-------+--------------------------+---------------+------------------+---------------------+
| SendCode | SolarSeahorse\WebmanRedisQueue\Process\ConsumerProcess | 20 | app\queue\test\SendCode | 1996 | 950 | >=500 |
| SendEmail | SolarSeahorse\WebmanRedisQueue\Process\ConsumerProcess | 20 | app\queue\test\SendEmail | 0 | 0 | 0 |
| SendSmsCode | SolarSeahorse\WebmanRedisQueue\Process\ConsumerProcess | 1 | app\queue\SendSmsCode | 0 | 0 | 0 |
+-------------+--------------------------------------------------------+-------+--------------------------+---------------+------------------+---------------------+
處理歷史消息
使用場景:
- 在極端情況下業(yè)務(wù)執(zhí)行完畢并且ack成功,但是刪除消息時(shí)出現(xiàn)異常,消息保留在
stream
中,一般少量數(shù)據(jù)時(shí)我們無需在意,但如果堆積數(shù)量過大可能造成內(nèi)存占用和性能問題。 - 當(dāng)你需要處理歷史消息,或者重新處理之前已經(jīng)處理過的消息。
- 當(dāng)你需要對 Stream 中的歷史數(shù)據(jù)進(jìn)行分析或生成報(bào)告。
當(dāng)
autoDel
屬性為true
時(shí),消息會(huì)自動(dòng)刪除,無法對歷史數(shù)據(jù)進(jìn)行處理和分析,如果業(yè)務(wù)需要對歷史隊(duì)列消息進(jìn)行回溯請?jiān)O(shè)置為false
代碼示例:
這里我們使用了webman
中自定義腳本
的編寫,可以將腳本加入定時(shí)任務(wù)中,定期清理或處理歷史消息。
下方代碼只是示例,請確保在測試環(huán)境充分測試。
<?php
use SolarSeahorse\WebmanRedisQueue\Queue\QueueMessage;
require_once __DIR__ . '/../vendor/autoload.php';
require_once __DIR__ . '/../support/bootstrap.php';
// 獲取隊(duì)列的Redis連接
$sendCode = new app\queue\test\SendCode();
$redisConnection = $sendCode->getRedisConnection();
// 使用方法和phpredis擴(kuò)展一致
$streamKey = $sendCode->getStreamKey();
$start = '-'; // 表示從 Stream 的最開始讀取
$end = '+'; // 表示讀取到 Stream 的最末尾
$count = 100; // 指定要讀取的消息數(shù)量
// 讀取Stream列表,不包括pending
$messages = $redisConnection->xRange($streamKey, $start, $end, $count);
$deleteMessageIds = [];
foreach ($messages as $messageId => $message) {
// 解析原始消息內(nèi)容
$messageArr = QueueMessage::parseRawMessage($message);
if (!$messageArr) { // 未知消息
$deleteMessageIds[] = $messageId;
continue;
}
// 轉(zhuǎn)換為QueueMessage方便操作
$queueMessage = QueueMessage::createFromArray($messageArr);
// 通過獲取消息時(shí)間戳,如果消息已經(jīng)存在超過1個(gè)小時(shí) 標(biāo)記刪除。
if (time() - $queueMessage->getTimestamp() > 3600) {
$deleteMessageIds[] = $messageId;
}
}
// 批量刪除消息
$redisConnection->xDel($streamKey, $deleteMessageIds);
在其他項(xiàng)目投遞消息
目前插件沒有實(shí)現(xiàn)在其他項(xiàng)目投遞的標(biāo)準(zhǔn)實(shí)現(xiàn),可通過業(yè)務(wù)需求開發(fā)隊(duì)列提交接口實(shí)現(xiàn)。