国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

[solarseahorse] redis消息隊(duì)列插件

v1.0.1 版本
2024-01-27 版本更新時(shí)間
404 安裝
6 star

Webman Redis Queue 插件

簡介

webman-redis-queue 是為 Webman 框架設(shè)計(jì)的高效、靈活的 Redis
隊(duì)列插件。利用 Redis Stream 的強(qiáng)大特性,該插件專注于提供可靠和高性能的消息隊(duì)列解決方案,適合處理大規(guī)模的數(shù)據(jù)流和復(fù)雜的隊(duì)列操作。

主要特性

  • 基于 Redis Stream: 使用 Redis 最新的 Stream 數(shù)據(jù)類型,為消息隊(duì)列和事件流提供優(yōu)化的存儲和訪問。
  • 自定義異常重試: 支持自定義的消息處理失敗重試機(jī)制,提高消息處理的可靠性。
  • 死信隊(duì)列處理: 集成死信隊(duì)列管理,確保消息不會(huì)因處理失敗而丟失。
  • 延時(shí)隊(duì)列支持: 實(shí)現(xiàn)延時(shí)消息處理,使得定時(shí)任務(wù)和延遲執(zhí)行變得簡單易行。
  • 高效的異常處理機(jī)制: 強(qiáng)化的異常處理策略,確保隊(duì)列的穩(wěn)定運(yùn)行。

安裝

通過 Composer 安裝 webman-redis-queue

composer require solarseahorse/webman-redis-queue:^1.0.1

版本變更記錄

v1.0.1 (20240128)

新增功能

  • 刪除延時(shí)消息
    新增 removeDelayedMessage 方法,允許移除一條延時(shí)消息。

  • 批量刪除延時(shí)消息
    新增 removeDelayedMessages 方法,允許一次性移除多個(gè)指定的延時(shí)消息。

  • 檢查延時(shí)消息存在性
    新增 hasDelayedMessageExists 方法,用于檢查延時(shí)消息是否存在。

  • 批量檢查延時(shí)消息存在性
    新增 hasDelayedMessagesExist 方法,用于批量檢查多個(gè)延時(shí)消息是否存在。

異常處理

  • 引入新的異常類型
    為延時(shí)消息的移除和存在性檢查操作引入了 DelayedMessageRemoveExceptionDelayedMessageCheckException 異常類型。

文檔修正

  • 修正文檔中的幾處錯(cuò)誤
    對插件的官方文檔進(jìn)行了更新,修正了之前版本中存在的一些描述不準(zhǔn)確和排版錯(cuò)誤。

測試和反饋

我們非常歡迎并鼓勵(lì)您在測試環(huán)境中嘗試這個(gè)插件,并且分享您的使用體驗(yàn)。您的反饋對我們改進(jìn)插件、修復(fù)潛在的問題以及發(fā)布未來的穩(wěn)定版本非常重要。如果您在使用過程中遇到任何問題或有任何建議,請通過 GitHub Issues
與我聯(lián)系。

參與貢獻(xiàn)

如果您對改進(jìn) webman-redis-queue 有興趣,歡迎任何形式的貢獻(xiàn),包括但不限于:提交問題、提供反饋、或直接向代碼庫提交改進(jìn)。您的貢獻(xiàn)將幫助我們更快地推出穩(wěn)定、功能豐富的正式版本。

配置

配置文件自動(dòng)生成在 config/plugin/solarseahorse/webman-redis-queue目錄下。

1. Redis配置 redis.php

<?php
return [
    'default' => [
        'host' => 'redis://127.0.0.1:6379',
        'options' => [
            'auth' => null,       // 密碼,字符串類型,可選參數(shù)
            'db' => 0,            // 數(shù)據(jù)庫
            'prefix' => 'webman_redis_queue_',      // key 前綴
            'timeout' => 2, // Timeout
            'ping' => 55,             // Ping
            'reconnect' => true,  // 斷線重連
            'max_retries' => 5, // 最大重連次數(shù)
            'retry_interval' => 5 , // 重連間隔 s
        ]
    ],
];

在webman集群下,每個(gè)節(jié)點(diǎn)需要連接同一個(gè)redis。

斷線重連

注意:開啟此選項(xiàng)能增加隊(duì)列運(yùn)行穩(wěn)定性,但如果隊(duì)列進(jìn)程過多,redis恢復(fù)后可能造成突發(fā)大量連接數(shù),因?yàn)槊總€(gè)進(jìn)程都有一個(gè)redis連接。

默認(rèn)開啟,當(dāng)Redis發(fā)生重載,重啟等情況會(huì)嘗試重連,超過最大重試次數(shù)后會(huì)報(bào)錯(cuò)并重啟進(jìn)程(webman默認(rèn)行為)。

2. 日志配置 log.php

推薦為插件配置單獨(dú)日志通道,參考鏈接 webman日志


<?php

return [
    'enable' => true, // 啟用日志
    'handlers' => support\Log::channel('default') // 默認(rèn)通道 default
];

在隊(duì)列消費(fèi)業(yè)務(wù)邏輯中可以這樣使用日志,使用方法和官方的Log類使用方法一致。


LogUtility::warning('Error:', [
      'data' => $consumerMessage->getData(),
      'errorMessage' => $e->getMessage()
]);

4. 隊(duì)列配置 process.php

  1. 在加載類名模式下,每個(gè)隊(duì)列都擁有獨(dú)立的運(yùn)行進(jìn)程。
  2. 每個(gè)隊(duì)列的配置和數(shù)據(jù)存儲KEY都是獨(dú)立的。
  3. 不推薦目錄模式是因?yàn)槎鄠€(gè)隊(duì)列共享進(jìn)程,其中某個(gè)隊(duì)列出現(xiàn)異常可能影響到其他隊(duì)列。
  4. 隊(duì)列的詳細(xì)配置都在消費(fèi)類中配置,配置文件只是基本的進(jìn)程配置。
<?php

return [
    'send-email' => [
        'handler' => SolarSeahorse\WebmanRedisQueue\Process\ConsumerProcess::class,
        'count' => 20, // 在目錄模式中,目錄下所有隊(duì)列是共用進(jìn)程
        'constructor' => [
            // 支持目錄和類 推薦使用類名
            'consumer_source' => \App\queue\test\Email::class
        ]
    ]
];

定義消費(fèi)類

插件對消費(fèi)類對位置沒有固定要求,符合加載規(guī)范即可。

教程以app/queue/SendEmail.php舉例,目錄和文件需自行創(chuàng)建。

繼承 SolarSeahorse\WebmanRedisQueue\Consumer,配置連接標(biāo)識,并實(shí)現(xiàn)抽象方法consume, 一個(gè)最基礎(chǔ)的消費(fèi)類就創(chuàng)建好了。

<?php

namespace app\queue\test;

use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;

class SendEmail extends Consumer
{
    // 連接標(biāo)識,對應(yīng)config/plugin/solarseahorse/webman-redis-queue/redis.php的配置
    protected string $connection = 'default';

    public function consume(ConsumerMessageInterface $consumerMessage)
    {
        // TODO: Implement consume() method.
    }
}

編寫完成后需要在隊(duì)列配置文件process.php中新增隊(duì)列配置。

通過命令行創(chuàng)建

通過 php webman solar:make:consumer 命令可快速創(chuàng)建一個(gè)消費(fèi)類。

示例操作:

webman % php webman solar:make:consumer

Please enter the name of the queue: sendCode

Please enter the number of processes (default 1): 1

Please enter the path to create the class in [app/queue]: app/queue/test

最終將會(huì)在 app/queue/test 目錄中創(chuàng)建 SendCode.php 文件。

<?php

namespace app\queue\test;

use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;

class SendCode extends Consumer
{
    // 連接標(biāo)識,對應(yīng)config/plugin/solarseahorse/webman-redis-queue/redis.php的配置
    protected string $connection = 'default';

    // 消費(fèi)
    public function consume(ConsumerMessageInterface $consumerMessage)
    {
        // TODO: Implement consume() method.

        // 獲取消息ID
        $messageId = $consumerMessage->getMessageId();

        // 獲取隊(duì)列數(shù)據(jù)
        $data = $consumerMessage->getData();

        var_dump($messageId);
    }
}

隊(duì)列配置文件process.php也會(huì)自動(dòng)更新。

<?php

return array (
  'sendCode' => 
  array (
    'handler' => 'SolarSeahorse\\WebmanRedisQueue\\Process\\ConsumerProcess',
    'count' => 1,
    'constructor' => 
    array (
      'consumer_source' => 'app\\queue\\test\\SendCode',
    ),
  ),
);

配置屬性

protected string $connection = 'default';

  • 連接標(biāo)識,用于指定 Redis 連接配置。

protected string $queueName = '';

  • 隊(duì)列名稱,默認(rèn)自動(dòng)生成。

protected string $groupName = '';

  • 隊(duì)列分組名,默認(rèn)自動(dòng)生成。

protected string $streamKey = '';

  • Stream key,默認(rèn)自動(dòng)生成。

protected int $prefetchCount = 1;

  • 返回消息的最大數(shù)量。默認(rèn)為1 不建議修改
  • 消費(fèi)速度可通過提高進(jìn)程數(shù)并行處理消息,消費(fèi)者每次讀取多條數(shù)據(jù)是循環(huán)消費(fèi),極端情況如循環(huán)消費(fèi)一半進(jìn)程重啟會(huì)造成大量消息掛起。

protected int $blockTime = 5000;

  • 當(dāng)無消息時(shí)堵塞等待的毫秒數(shù),也可作為無消息時(shí)的休眠時(shí)長。如果隊(duì)列以延時(shí)隊(duì)列為主,應(yīng)與延時(shí)隊(duì)列間隔相近。

protected float $consumerTimerInterval = 0.5;

  • 消費(fèi)者處理間隔,消費(fèi)完一條消息后的等待時(shí)間(秒)。

protected int $maxAttempts = 5;

  • 消費(fèi)失敗后的最大重試次數(shù)。

protected int $retrySeconds = 60;

  • 重試間隔(秒)。

protected bool $autoAck = true;

  • 是否自動(dòng)確認(rèn)消息。開啟的同時(shí)同樣建議在業(yè)務(wù)邏輯中顯式調(diào)用 ack方法。

protected bool $autoDel = true;

  • 是否自動(dòng)刪除已確認(rèn)成功的消息。

protected int $delayedQueueOnceHandlerCount = 128;

  • 延時(shí)隊(duì)列每次處理數(shù)量,根據(jù)生產(chǎn)速率適當(dāng)配置。

protected int $delayedMessagesTimerInterval = 1;

  • 延時(shí)消息處理間隔(秒)。

protected int $delayedMessagesMaxWorkerCount = 1;

  • 延時(shí)隊(duì)列最大進(jìn)程數(shù),默認(rèn)單線程,只會(huì)在一個(gè)進(jìn)程開啟延時(shí)隊(duì)列處理。

protected string $delayedTaskSetKey = '';

  • 延時(shí)隊(duì)列 SET KEY,默認(rèn)自動(dòng)生成。

protected string $delayedDataHashKey = '';

  • 延時(shí)隊(duì)列 HASH KEY,默認(rèn)自動(dòng)生成。

protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_RETRY;

  • 消息掛起超時(shí)處理策略。PENDING_PROCESSING_RETRYPENDING_PROCESSING_IGNORE。
  • PENDING_PROCESSING_RETRY 當(dāng)消息掛起超時(shí)會(huì)進(jìn)行異常重試。
  • PENDING_PROCESSING_IGNORE 當(dāng)消息掛起超時(shí)時(shí),觸發(fā)死信處理方便排查錯(cuò)誤,除此之外只清理pending列表,不做其他處理。
  • 默認(rèn) PENDING_PROCESSING_RETRY , 根據(jù)隊(duì)列場景選擇合適的處理策略,比如發(fā)送短信驗(yàn)證碼
    ,當(dāng)系統(tǒng)出現(xiàn)了崩潰等情況,恢復(fù)上線時(shí),一般情況下這類消息時(shí)不需要恢復(fù),此時(shí)重新給用戶發(fā)送驗(yàn)證碼沒有意義,但因?yàn)?code>Redis Stream特性,未ack的消息會(huì)在pending列表中不會(huì)丟失,這類場景就適合配置PENDING_PROCESSING_IGNORE

protected int $pendingTimout = 300;

  • 消息掛起超時(shí)時(shí)間(秒)。
  • 在Redis Stream中當(dāng)消息被消費(fèi)者讀取,但沒有確認(rèn)(ACK)時(shí),消息會(huì)處于掛起狀態(tài)進(jìn)入pending列表。
  • 如果消息處理緩慢,此值應(yīng)盡可能調(diào)大,避免將正常處理的消息當(dāng)成超時(shí)處理掉。

protected int $checkPendingTimerInterval = 60;

  • 檢查 pending 列表的間隔時(shí)間(秒)。

protected int $onceCheckPendingCount = 50;

  • 每次檢查 pending 列表的消息數(shù)量。

投遞消息

通過pushMessage方法可快速向隊(duì)列投遞一條消息。


/**
 * @param mixed|QueueMessageInterface $data
 * @return string|bool
 * @throws QueueMessagePushException
 */

 public function pushMessage(string|array|int|QueueMessageInterface $data): string|bool;

// 消息內(nèi)容,無需序列化
$message = [
    'dummy' => 'ok'
];

// 生產(chǎn)者工廠方法
$messageId = QueueProducerFactory::create(app\queue\test\SendEmail::class)
    ->pushMessage($message);

// 通過消費(fèi)類工廠方法 創(chuàng)建一個(gè)生產(chǎn)者
$messageId = app\queue\test\SendEmail::createQueueProducer()->pushMessage($message);

// 投遞QueueMessage對象
$message = app\queue\test\SendEmail::createQueueMessage($message);

// 或者通過QueueMessageFactory創(chuàng)建一條消息
$message = QueueMessageFactory::create(app\queue\test\SendEmail::class,$message);

// 修改隊(duì)列數(shù)據(jù)
$message->setData(['dummy' => 'no']);

// 設(shè)置錯(cuò)誤次數(shù)
$message->setFailCount(3);

// 通過上方兩種方法投遞均可
$messageId = app\queue\test\SendEmail::createQueueProducer()->pushMessage($message);

var_export($messageId); // 返回stream的字符串ID 或 false

有時(shí)候我們需要一次投遞大量隊(duì)列時(shí),可以通過pushMessages方法,批量投遞消息,此方法會(huì)開啟Redispipeline
管道投遞,提高與redis的交互性能。


/**
 * @param array|QueueMessageInterface[] $dataArray
 * @return array|false
 * @throws QueueMessagePushException
 */

 public function pushMessages(array $dataArray): array|bool;

// 投遞5w條消息
$dataArr = array_fill(0, 50000, null);

for ($i = 0; $i < 50000; $i++) {
    $dataArr[$i] = ['dummy' => uniqid()];
}

$messageIds = app\queue\test\SendEmail::createQueueProducer()->pushMessages($dataArr);

// QueueMessage方式

for ($i = 0; $i < 50000; $i++) {

    $message = QueueMessageFactory::create(app\queue\test\SendEmail::class, ['dummy' => uniqid()]);

    //$message->setData(json_encode(['123']));
    //$message->setFailCount(1);
    // ....

    $dataArr[$i] = $message;
}

$messageIds = app\queue\test\SendEmail::createQueueProducer()->pushMessages($dataArr);

var_export($messageIds); // 返回Stream消息ID列表 或 false

數(shù)組投遞實(shí)際是通過數(shù)組創(chuàng)建一個(gè)QueueMessage對象

延時(shí)消息

延時(shí)消息的作用:

  1. 定時(shí)任務(wù):
    延時(shí)消息可以用來實(shí)現(xiàn)定時(shí)任務(wù)。例如,你可能想在未來的某個(gè)時(shí)間點(diǎn)執(zhí)行特定操作,如發(fā)送提醒、更新狀態(tài)等。

  2. 延遲處理:
    在某些情況下,立即處理消息并不理想或可能。延時(shí)消息允許應(yīng)用程序延遲處理,直到最合適的時(shí)機(jī)。

  3. 限流:
    延時(shí)消息可以幫助對系統(tǒng)內(nèi)部的請求進(jìn)行限流,防止在短時(shí)間內(nèi)因大量請求而過載。

  4. 解耦和異步處理:
    在復(fù)雜的系統(tǒng)中,延時(shí)消息可以用來解耦不同組件間的直接交互,提高系統(tǒng)的可擴(kuò)展性和維護(hù)性。

通過 scheduleDelayedMessage 方法快速投遞一條延時(shí)消息。

    /**
     * @param mixed|QueueMessageInterface $data
     * @param int $delay
     * @param string $identifier
     * @return bool
     * @throws ScheduleDelayedMessageException
     */
    public function scheduleDelayedMessage(string|array|int|QueueMessageInterface $data, int $delay = 0, string $identifier = ''): bool;

// 消息內(nèi)容
$message = [
    'type' => 'warning',
    'to' => 'xxxx@email.com',
    'content' => '.....'
];

// 投遞一條延時(shí)消息 60秒后處理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message, 60);

// QueueMessage對象

$message = app\queue\test\SendEmail::createQueueMessage($message);

// 設(shè)置延時(shí)
$message->setDelay(60);

// 投遞一條延時(shí)消息 60秒后處理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message);

// 使用第二個(gè)參數(shù)會(huì)替換之前對象的延時(shí)設(shè)置
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message,80);

如果我們想避免消息被重復(fù)發(fā)送等情況,通過延時(shí)隊(duì)列的特性可以很簡單實(shí)現(xiàn)。通過scheduleDelayedMessage
方法的第三個(gè)參數(shù)identifier傳遞一個(gè)自定義的延時(shí)消息ID,同樣的消息ID,消息將會(huì)被替換,延時(shí)時(shí)間從修改開始重新計(jì)算。

如果消息已經(jīng)進(jìn)入stream隊(duì)列將無法實(shí)現(xiàn)替換,必須在延時(shí)時(shí)間內(nèi),類似實(shí)現(xiàn)一個(gè)“防抖”效果,消息在時(shí)間段內(nèi)發(fā)送多次最終只處理一次。


// 消息內(nèi)容
$message = [
    'type' => 'warning',
    'to' => 'xxxx@email.com',
    'content' => '.....'
];

// 通過type,to參數(shù)生成一個(gè)唯一ID
$identifier = md5(serialize([
    'type' => 'warning',
    'to' => 'xxxx@email.com',
]));

// 投遞一條延時(shí)消息 60秒后處理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message, 60, $identifier);

// QueueMessage對象

$message = app\queue\test\SendEmail::createQueueMessage($message);

// 設(shè)置延時(shí)
$message->setDelay(60);

// 設(shè)置identifier
$message->setIdentifier($identifier);

// 投遞一條延時(shí)消息 60秒后處理
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message);

// 傳遞參數(shù)會(huì)替換對象之前的延時(shí)和ID設(shè)置
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessage($message, 80, $identifier);

當(dāng)一次需要投遞大量延時(shí)消息時(shí),可以通過scheduleDelayedMessages方法發(fā)送。


// 投遞10w條延時(shí)消息
$dataArr = array_fill(0, 100000, null);

for ($i = 0; $i < 100000; $i++) {
    $dataArr[$i] = [
        'delay' => 2, // 延時(shí)時(shí)間
        'data' => ['dummy' => uniqid()], // 隊(duì)列數(shù)據(jù)
        'identifier' => '' // 自定義ID
    ];
}

// 批量投遞
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessages($dataArr);

// QueueMessage對象

for ($i = 0; $i < 100000; $i++) {

    $message = app\queue\test\SendEmail::createQueueMessage(['dummy' => uniqid()]);

    // 設(shè)置延時(shí)
    $message->setDelay(60);

    // 設(shè)置identifier
    $message->setIdentifier('');

    $dataArr[$i] = $message;
}

// 批量投遞
app\queue\test\SendEmail::createQueueProducer()->scheduleDelayedMessages($dataArr);

多redis只需要在隊(duì)列配置connection連接標(biāo)識,投遞方式?jīng)]有任何變化。

移除延時(shí)隊(duì)列消息

新功能 (v1.0.1)

以下功能在插件的 v1.0.1 版本中新增。

有時(shí)候我們想移除某個(gè)或多個(gè)延時(shí)隊(duì)列時(shí),可以使用removeDelayedMessageremoveDelayedMessages
方法實(shí)現(xiàn),使用hasDelayedMessageExistshasDelayedMessagesExist判斷一條或多條延時(shí)消息是否存在。

只有任務(wù)還存在延時(shí)隊(duì)列中才能移除,如果已經(jīng)進(jìn)入Stream隊(duì)列中將無法移除。

    /**
     * @param string $identifier
     * @return bool
     */
    public function removeDelayedMessage(string $identifier): bool;

    /**
     * @param array $identifiers
     * @return bool|array
     */
    public function removeDelayedMessages(array $identifiers): array|bool;

    /**
     * @param string $identifier
     * @return bool
     */
    public function hasDelayedMessageExists(string $identifier): bool;

    /**
     * @param array $identifiers
     * @return bool|array
     */
    public function hasDelayedMessagesExist(array $identifiers): array|bool;

代碼示例:

$consumer = new app\queue\test\SendEmail();

$queueProducer = $consumer::createQueueProducer();

// 添加一條延時(shí)消息 通過業(yè)務(wù)數(shù)據(jù)生成消息ID
$queueProducer->scheduleDelayedMessage(['dummy' => 'ok'], 60, 'email_user_id');

// 通過QueueMessage對象
$queueMessage = $consumer::createQueueMessage(['dummy' => 'ok']);

$queueMessage->setDelay(60);

// 自定義消息ID 不設(shè)置將默認(rèn)生成 通過getIdentifier()獲取
$queueMessage->setIdentifier('test_id');

// 獲取消息ID
$id = $queueMessage->getIdentifier();

// 判斷消息是否存在
var_export(SendEmail::createQueueProducer()->hasDelayedMessageExists('identifier'));  // true or false

// 移除一條延時(shí)消息
var_export(SendEmail::createQueueProducer()->removeDelayedMessage('identifier')); // true or false

// 判斷多條消息是否存在 返回一個(gè)數(shù)組
var_export(SendEmail::createQueueProducer()->hasDelayedMessagesExist(['identifier1', 'identifier1', 'identifier1']));

//.array (
//    0 => 1706383223.0,
//    1 => 1706383223.0,
//    2 => false
//).

// 移除多條延時(shí)消息 返回一個(gè)數(shù)組
var_export(SendEmail::createQueueProducer()->removeDelayedMessages(['identifier1', 'identifier1', 'identifier1']));

//
//.array (
//    0 => 1,
//    1 => 1,
//    2 => 0
//).  

消費(fèi)消息

消費(fèi)消息時(shí)會(huì)調(diào)用消費(fèi)類的consume方法,并傳遞一個(gè)實(shí)現(xiàn)ConsumerMessageInterface接口對象。

<?php

namespace app\queue\test;

use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;

class SendEmail extends Consumer
{
    // 連接標(biāo)識,對應(yīng)redis.php的配置 默認(rèn)default
    protected string $connection = 'default';

    public function consume(ConsumerMessageInterface $consumerMessage)
    {
        // TODO: Implement consume() method.

        // 獲取消息ID
        $messageId = $consumerMessage->getMessageId();

        // 獲取隊(duì)列數(shù)據(jù)
        $data = $consumerMessage->getData();

        // 禁用錯(cuò)誤重試 如果消費(fèi)失敗將不會(huì)異常重試
        $consumerMessage->disableFailRetry();

        // 手動(dòng)觸發(fā)錯(cuò)誤重試,此方法會(huì)調(diào)用disableFailRetry方法,所以后續(xù)報(bào)錯(cuò)不會(huì)再觸發(fā)異常重試。
        // 沒有禁用錯(cuò)誤重試的情況下,消費(fèi)異常默認(rèn)會(huì)調(diào)用此方法。
        $consumerMessage->triggerError(new \Exception('triggerError'));

        // 監(jiān)聽消費(fèi)異常事件
        $consumerMessage->onError(function (\Throwable $e, ConsumerMessageInterface $consumerMessage) {
            // 這里可以處理消費(fèi)異常邏輯

            // 禁用錯(cuò)誤重試
            $consumerMessage->disableFailRetry();

            // 添加日志等等
            // 如果在消費(fèi)方法中自行捕獲 Throwable 此事件不會(huì)觸發(fā)
        });

        // 業(yè)務(wù)邏輯執(zhí)行完畢,ack確認(rèn)消息 默認(rèn)自動(dòng)ack,但通常建議在業(yè)務(wù)邏輯中顯式調(diào)用,比如ack失敗進(jìn)行事務(wù)回滾等等。
        $isAcked = $consumerMessage->ack();

        if (!$isAcked) {

        }

        // 或通過getAckStatus方法獲取結(jié)果
        if (!$consumerMessage->getAckStatus()) {

        }

        // 獲取原始隊(duì)列消息 QueueMessage對象
        $queueMessage = $consumerMessage->getQueueMessage();

        // 獲取消息錯(cuò)誤次數(shù)...
        $failCount = $queueMessage->getFailCount();
        // 更多...
    }
}

上方示例主要演示可調(diào)用的方法,下面使用一個(gè)更加貼合實(shí)際的demo,更快了解消費(fèi)業(yè)務(wù)邏輯的編寫。

發(fā)送郵件驗(yàn)證碼

場景特點(diǎn):獲取驗(yàn)證碼的操作一般由用戶手動(dòng)觸發(fā),在這類場景中,錯(cuò)誤重試應(yīng)用戶在前端UI倒計(jì)時(shí)結(jié)束后重新手動(dòng)發(fā)起,如果業(yè)務(wù)出現(xiàn)崩潰,再次上線后重新發(fā)送驗(yàn)證碼給用戶已經(jīng)沒有意義了。我們可以通過配置適應(yīng)這類場景,代碼示例:

<?php

namespace app\queue\test;

use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;

class SendEmail extends Consumer
{
    // 連接標(biāo)識,對應(yīng)redis.php的配置 默認(rèn)default
    protected string $connection = 'default';

    // 將pending處理策略調(diào)整為PENDING_PROCESSING_IGNORE 消息掛起超時(shí)將不會(huì)進(jìn)行重試
    protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_IGNORE;

    public function consume(ConsumerMessageInterface $consumerMessage)
    {
        // TODO: Implement consume() method.

        // 獲取消息ID
        $messageId = $consumerMessage->getMessageId();

        // 獲取隊(duì)列數(shù)據(jù)
        $data = $consumerMessage->getData();

        // 監(jiān)聽異常
        $consumerMessage->onError(function (\Throwable $e){

            // 記錄郵件發(fā)送失敗日志
        });

        // 禁用重試
        $consumerMessage->disableFailRetry();

        // 發(fā)送一封郵件 ....

        // 確認(rèn)消息
        $consumerMessage->ack();
    }
}

自定義錯(cuò)誤重試

消費(fèi)類繼承的抽象類Consumer默認(rèn)實(shí)現(xiàn)了handlerFailRetry
方法,在觸發(fā)異常重試時(shí),會(huì)調(diào)用此方法,如果您想自定義錯(cuò)誤重試邏輯,或加入更多自定義的處理,在本插件中可以輕松實(shí)現(xiàn),并且每個(gè)隊(duì)列都支持自定義配置。

/**
     * 處理錯(cuò)誤重試
     * @param $messageId
     * @param ConsumerMessageInterface $consumerMessage
     * @param Throwable $e
     * @return bool
     * @throws ScheduleDelayedMessageException
     * @throws RedisException
     * @throws Throwable
     */
    public function handlerFailRetry($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): bool
    {
        $queueMessage = $consumerMessage->getQueueMessage();

        // 檢查是否超過最大重試次數(shù)
        if ($queueMessage->getFailCount() >= $this->maxAttempts) {
            // 死信處理
            $this->handlerDeadLetterQueue($messageId, $consumerMessage, $e);
            return true;
        }

        $queueMessage->incrementFailCount(); // Fail count + 1

        // 計(jì)算下次重試時(shí)間
        $retrySeconds = $queueMessage->getFailCount() * $this->retrySeconds;

        // 更新下次重試時(shí)間
        $queueMessage->updateNextRetry($retrySeconds);

        // 設(shè)置消息延時(shí)
        $queueMessage->setDelay($retrySeconds);

        // 設(shè)置消息ID 避免重復(fù)任務(wù)
        $queueMessage->setIdentifier($messageId);

        // 重新發(fā)布至延時(shí)隊(duì)列
        return self::createQueueProducer()->scheduleDelayedMessage($queueMessage);
    }

默認(rèn)實(shí)現(xiàn)的代碼如上,我們只需要重寫此方法就可以自定義錯(cuò)誤處理的業(yè)務(wù)邏輯。

代碼示例:

<?php

namespace app\queue\test;

use SolarSeahorse\WebmanRedisQueue\Consumer;
use SolarSeahorse\WebmanRedisQueue\Interface\ConsumerMessageInterface;
use Throwable;

class SendEmail extends Consumer
{
    // 連接標(biāo)識,對應(yīng)redis.php的配置 默認(rèn)default
    protected string $connection = 'default';

    // 將pending處理策略調(diào)整為PENDING_PROCESSING_IGNORE 消息掛起超時(shí)將不會(huì)進(jìn)行重試
    protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_IGNORE;

    public function consume(ConsumerMessageInterface $consumerMessage)
    {
        // TODO: Implement consume() method.

        // 獲取消息ID
        $messageId = $consumerMessage->getMessageId();

        // 獲取隊(duì)列數(shù)據(jù)
        $data = $consumerMessage->getData();

        // 監(jiān)聽異常
        $consumerMessage->onError(function (\Throwable $e){

            // 記錄郵件發(fā)送失敗日志
        });

        // 禁用重試
        $consumerMessage->disableFailRetry();

        // 發(fā)送一封郵件 ....

        // 確認(rèn)消息
        $consumerMessage->ack();
    }

    public function handlerFailRetry($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): bool
    {
        // 不改動(dòng)原本的錯(cuò)誤處理 也可以完全自定義實(shí)現(xiàn)。
        parent::handlerFailRetry($messageId, $consumerMessage, $e);

       // 如果隊(duì)列在業(yè)務(wù)數(shù)據(jù)庫中還有一個(gè)tasks表進(jìn)行調(diào)度,在這里可以更新task數(shù)據(jù) 比如 錯(cuò)誤次數(shù)+1
    }
}

自定義死信處理

handlerFailRetry方法中,默認(rèn)有這一段:


// 檢查是否超過最大重試次數(shù)
if ($queueMessage->getFailCount() >= $this->maxAttempts) {
    // 死信處理
    $this->handlerDeadLetterQueue($messageId, $consumerMessage, $e);
    return true;
}

那么,我們?nèi)绻枰远x死信處理或加入額外的業(yè)務(wù)邏輯可以通過重寫handlerDeadLetterQueue方法實(shí)現(xiàn)。

protected int $pendingProcessingStrategy = self::PENDING_PROCESSING_IGNORE;
當(dāng)我們設(shè)置pending處理策略為PENDING_PROCESSING_IGNORE
時(shí),消息如果掛起超時(shí),將不會(huì)觸發(fā)異常重試,而是直接調(diào)用死信處理。默認(rèn)情況下,死信處理會(huì)新增一條日志,方便排查問題。

默認(rèn)情況下需要配置有效的日志(log.php) 默認(rèn)行為才有效。也可以通過重寫方法完全自行實(shí)現(xiàn),記錄在業(yè)務(wù)的數(shù)據(jù)庫中,這也是推薦的做法,可以針對業(yè)務(wù)實(shí)現(xiàn)更加靈活的異常處理。

    /**
     * 處理死信 超過最大重試次數(shù)或pending超時(shí)PENDING_PROCESSING_IGNORE策略 會(huì)調(diào)用此方法
     * @param $messageId
     * @param ConsumerMessageInterface $consumerMessage
     * @param Throwable $e
     * @return void
     */
    public function handlerDeadLetterQueue($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): void
    {
        $queueMessage = $consumerMessage->getQueueMessage();

        // 添加日志
        LogUtility::warning('dead_letter_queue: ', [
            'messageId' => $messageId,
            'message' => $queueMessage->toArray(),
            'failCount' => $queueMessage->getFailCount(),
            'errorMsg' => $e->getMessage(),
            'trace' => $e->getTraceAsString()
        ]);

        // 更多...
    }

代碼示例:

public function handlerDeadLetterQueue($messageId, ConsumerMessageInterface $consumerMessage, Throwable $e): void
{
    // 保持默認(rèn)行為
    parent::handlerDeadLetterQueue($messageId, $consumerMessage, $e); // TODO: Change the autogenerated stub

    // 如果隊(duì)列在業(yè)務(wù)數(shù)據(jù)庫中還有一個(gè)tasks表進(jìn)行調(diào)度,在這里可以更新task數(shù)據(jù)
}

自定義pending超時(shí)處理

抽象類Consumer中,默認(rèn)定義了handlerPendingTimeoutMessages方法,用于處理pending超時(shí)的消息。

消費(fèi)者讀取了一條消息后,消息會(huì)進(jìn)入pending
列表,不會(huì)被當(dāng)前和其他消費(fèi)者再次讀取,當(dāng)業(yè)務(wù)邏輯沒有執(zhí)行完畢,服務(wù)出現(xiàn)掉線,崩潰時(shí),消息并沒有ack,消息會(huì)一直保存在pending
列表中,pending列表只能通過ack移除,如果長期不處理,可能造成pending
列表堆積,造成大量內(nèi)存占用,當(dāng)持續(xù)時(shí)間大于$pendingTimout屬性的時(shí)間(默認(rèn)300秒),會(huì)調(diào)用此方法進(jìn)行處理。

默認(rèn)情況下,在PENDING_PROCESSING_IGNORE策略中,我們認(rèn)為pending超時(shí)消息是死信,不會(huì)再次處理,PENDING_PROCESSING_RETRY
會(huì)進(jìn)行異常重試。


     /**
     * 處理消息掛起超時(shí) 當(dāng)pending列表中有超時(shí)未ack的消息會(huì)觸發(fā)此方法
     * @param string $messageId
     * @param ConsumerMessageInterface $consumerMessage
     * @param string $consumerName
     * @param int $elapsedTime
     * @param int $deliveryCount
     * @return void
     * @throws RedisException
     * @throws ScheduleDelayedMessageException
     * @throws Throwable
     */
    public function handlerPendingTimeoutMessages(string $messageId, ConsumerMessageInterface $consumerMessage, string $consumerName, int $elapsedTime, int $deliveryCount): void
    {
        switch ($this->getPendingProcessingStrategy()) {
            case self::PENDING_PROCESSING_IGNORE: // 忽略pending超時(shí)

                // 確認(rèn)消息
                $consumerMessage->ack();

                // 觸發(fā)死信處理
                $this->handlerDeadLetterQueue($messageId, $consumerMessage, new Exception(
                    'PENDING_PROCESSING_IGNORE: Message pending timeout.'
                ));
                break;
            case self::PENDING_PROCESSING_RETRY: // pending超時(shí)重試

                // 觸發(fā)死信處理
                if ($deliveryCount + 1 > $this->getMaxAttempts()) {

                    // ack消息
                    $consumerMessage->ack();

                    $this->handlerDeadLetterQueue(
                        $messageId,
                        $consumerMessage,
                        new Exception(
                            'PENDING_PROCESSING_RETRY: The number of message delivery times exceeds the maximum number of retries.'
                        ));

                    return;
                }

                // 處理重試
                $handlerStatus = $this->handlerFailRetry(
                    $messageId,
                    $consumerMessage,
                    new Exception('PENDING_PROCESSING_RETRY: Message pending timeout retry.')
                );

                if ($handlerStatus) {
                    $consumerMessage->ack();
                }
                break;
        }
    }

獲取隊(duì)列的redis連接

有時(shí)候我們需要操作或維護(hù)隊(duì)列時(shí),可以直接獲取隊(duì)列的Redis連接進(jìn)行操作,比如編寫自定義腳本等。


// 獲取隊(duì)列的Redis連接
$sendCode = new app\queue\test\SendCode();

$redisConnection = $sendCode->getRedisConnection();

// 使用方法和phpredis擴(kuò)展一致

$redisConnection->xLen();

$redisConnection->sAdd();

// 在消費(fèi)類中可以直接使用$this->getRedisConnection();

....更多

命令行

php webman solar:make:consumer

  • 創(chuàng)建一個(gè)消費(fèi)者
  • 它將引導(dǎo)你創(chuàng)建一個(gè)基本的消費(fèi)者類

php webman solar:remove:consumer

  • 移除一個(gè)消費(fèi)者
  • 它將引導(dǎo)你移除消費(fèi)者類
  • 注意:它會(huì)移除redis中關(guān)于此消費(fèi)者的所有數(shù)據(jù),如果你只是想移除類和配置,請不要使用此命令。

php webman solar:clean:redis:data

  • 清理某個(gè)消費(fèi)者的Redis數(shù)據(jù)
  • 它將引導(dǎo)你清理redis數(shù)據(jù)
  • 注意:它將刪除redis中關(guān)于此消費(fèi)者的所有數(shù)據(jù),請謹(jǐn)慎操作。

php webman solar:consumer:list

  • 獲取當(dāng)前全部消費(fèi)者信息,包含如下信息:
  • Key 標(biāo)識
  • Handler 進(jìn)程類
  • Count 進(jìn)程數(shù)
  • Consumer 消費(fèi)者類名
  • Stream Length 當(dāng)前隊(duì)列總長度(不包含Pending列表中的數(shù)量)
  • Delay Set Length 當(dāng)前延時(shí)隊(duì)列任務(wù)數(shù)
  • Pending List Length 當(dāng)前Pending列表長度
+-------------+--------------------------------------------------------+-------+--------------------------+---------------+------------------+---------------------+
| Key         | Handler                                                | Count | Consumer                 | Stream Length | Delay Set Length | Pending List Length |
+-------------+--------------------------------------------------------+-------+--------------------------+---------------+------------------+---------------------+
| SendCode    | SolarSeahorse\WebmanRedisQueue\Process\ConsumerProcess | 20    | app\queue\test\SendCode  | 1996          | 950              | >=500               |
| SendEmail   | SolarSeahorse\WebmanRedisQueue\Process\ConsumerProcess | 20    | app\queue\test\SendEmail | 0             | 0                | 0                   |
| SendSmsCode | SolarSeahorse\WebmanRedisQueue\Process\ConsumerProcess | 1     | app\queue\SendSmsCode    | 0             | 0                | 0                   |
+-------------+--------------------------------------------------------+-------+--------------------------+---------------+------------------+---------------------+

處理歷史消息

使用場景:

  1. 在極端情況下業(yè)務(wù)執(zhí)行完畢并且ack成功,但是刪除消息時(shí)出現(xiàn)異常,消息保留在stream中,一般少量數(shù)據(jù)時(shí)我們無需在意,但如果堆積數(shù)量過大可能造成內(nèi)存占用和性能問題。
  2. 當(dāng)你需要處理歷史消息,或者重新處理之前已經(jīng)處理過的消息。
  3. 當(dāng)你需要對 Stream 中的歷史數(shù)據(jù)進(jìn)行分析或生成報(bào)告。

當(dāng)autoDel屬性為true
時(shí),消息會(huì)自動(dòng)刪除,無法對歷史數(shù)據(jù)進(jìn)行處理和分析,如果業(yè)務(wù)需要對歷史隊(duì)列消息進(jìn)行回溯請?jiān)O(shè)置為false

代碼示例:

這里我們使用了webman自定義腳本
的編寫,可以將腳本加入定時(shí)任務(wù)中,定期清理或處理歷史消息。

下方代碼只是示例,請確保在測試環(huán)境充分測試。


<?php

use SolarSeahorse\WebmanRedisQueue\Queue\QueueMessage;

require_once __DIR__ . '/../vendor/autoload.php';
require_once __DIR__ . '/../support/bootstrap.php';

// 獲取隊(duì)列的Redis連接
$sendCode = new app\queue\test\SendCode();

$redisConnection = $sendCode->getRedisConnection();

// 使用方法和phpredis擴(kuò)展一致
$streamKey = $sendCode->getStreamKey();
$start = '-'; // 表示從 Stream 的最開始讀取
$end = '+';   // 表示讀取到 Stream 的最末尾
$count = 100;  // 指定要讀取的消息數(shù)量

// 讀取Stream列表,不包括pending
$messages = $redisConnection->xRange($streamKey, $start, $end, $count);

$deleteMessageIds = [];

foreach ($messages as $messageId => $message) {

    // 解析原始消息內(nèi)容
    $messageArr = QueueMessage::parseRawMessage($message);

    if (!$messageArr) { // 未知消息
        $deleteMessageIds[] = $messageId;
        continue;
    }

    // 轉(zhuǎn)換為QueueMessage方便操作
    $queueMessage = QueueMessage::createFromArray($messageArr);

    // 通過獲取消息時(shí)間戳,如果消息已經(jīng)存在超過1個(gè)小時(shí) 標(biāo)記刪除。
    if (time() - $queueMessage->getTimestamp() > 3600) {
        $deleteMessageIds[] = $messageId;
    }

}

// 批量刪除消息
$redisConnection->xDel($streamKey, $deleteMessageIds);

在其他項(xiàng)目投遞消息

目前插件沒有實(shí)現(xiàn)在其他項(xiàng)目投遞的標(biāo)準(zhǔn)實(shí)現(xiàn),可通過業(yè)務(wù)需求開發(fā)隊(duì)列提交接口實(shí)現(xiàn)。

贊助商