使用workerman的redis-queue,日志里面會出現(xiàn)多次消費(fèi)記錄,望大佬們幫忙看下
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
//日志
use Monolog\Logger;
use Monolog\Handler\StreamHandler;
require_once __DIR__ . '/../function.php';
$consumer = new Worker();
$consumer->count = 8;
$consumer->onWorkerStart = function($consumer){
global $db, $config, $logger;
//數(shù)據(jù)庫鏈接
$db = new \Workerman\MySQL\Connection(
$config['database']['mysql']['host'], //數(shù)據(jù)庫鏈接
$config['database']['mysql']['port'], //數(shù)據(jù)庫端口
$config['database']['mysql']['user'], //數(shù)據(jù)庫用戶
$config['database']['mysql']['pass'], //數(shù)據(jù)庫密碼
$config['database']['mysql']['name'] //數(shù)據(jù)庫名字
);
//隊列鏈接
$client = new Client($config['redis']['host'], [
'auth' => $config['redis']['password']
]);
//日志
$logger = new Logger('xianyu');
//訂閱
$client->subscribe('xianyu', function($data){
global $logger, $config;
$logger->pushHandler(new StreamHandler($config['log']['path'] . date('Y/m/') . date('d') . '_xianyu.log'));
// 要調(diào)用的類名,加上Consumer命名空間
$class_name = "\\Consumer\\Xianyu";
// 要調(diào)用的方法名
$method = isset($data['method']) ? $data['method'] : '';
if(class_exists($class_name)){
$class = new $class_name;
$callback = [$class, $method];
if(is_callable($callback)){
call_user_func_array($callback, ['data' => $data['data']]);
}else{
$logger->error("$class_name::$method not exist\n");
}
}else{
$logger->error("不存在\n");
}
});
};
// 如果不是在根目錄啟動,則運(yùn)行runAll方法
if(!defined('GLOBAL_START')){
Worker::runAll();
}
Workerman version:4.1.13 PHP version:8.0.20
消費(fèi)失敗(拋出異常)時,消息會放到延遲隊列等待重試,會再次嘗試消費(fèi)。
還有就是入隊列時就重復(fù)了,從你的日志來看像是重復(fù)入隊列了,因為時間間隔不像是消費(fèi)失敗重試。
感謝大佬回答,我也懷疑過是入隊列重復(fù)的問題,然后就手動調(diào)用的入隊列的方法,如果特別頻繁的調(diào)用入隊列,重復(fù)的次數(shù)就越來越多
public function test(){
workerQueue('xianyu', 'send', ['ticket_id' => 11111]);
}
function workerQueue($queue, $method, $data, $delay = 0){
$config = config('cache.stores.redis');
$redis = new \Redis;
$redis->connect($config['host'], $config['port']);
$redis->auth($config['password']);
$data = [
'method' => $method,
'data' => $data
];
$delay = 0;
$queue_waiting = '{redis-queue}-waiting'; //1.0.5版本之前為redis-queue-waiting
$queue_delay = '{redis-queue}-delayed';//1.0.5版本之前為redis-queue-delayed
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
消費(fèi)隊列處理的方法,會不會是這里的問題
public function send($data = []){
global $db, $config, $logger;
try{
//業(yè)務(wù)邏輯,如果有問題記錄日志
throw new \Exception('沒有查找到可售票檔');
$logger->info('同步成功:'. json_encode($ticket));
}catch(\Exception $e){
$logger->error('同步失?。?. $e->getMessage());
}
}