啟 worker,在onWorkerStart 開啟定時器 1秒獲取 Redis 隊列數(shù)據(jù)。
從log中看,消費(fèi)了隊列數(shù)據(jù),偶現(xiàn)會沒有執(zhí)行。
沒有用workerman-queue
$asyncJobWorker->onWorkerStart = function () use ($client) {
// 獲取配置
foreach (Yii::$app->service->workermanQueue->jobConfig() as $config) {
// 訂閱隊列名,回調(diào)方案使用配置的 handler execute 來處理數(shù)據(jù)
// 這里的 回調(diào),如果返回 true, 運(yùn)行后會從 queue 中刪除,否則 不刪除
$client->subscribe($config['name'], function ($data, $jobId, $ttr, $attempt) use ($config) {
try {
// 可以理解為注入
$handler = Yii::createObject([
'class' => $config['handler'],
'task' => $data
]);
/** @var $handler JobInterface */
return $handler->execute(null);
} catch (\Exception $e) {
Yii::error('workerman queue exception:' . $e->getTraceAsString(), 'error');
return false;
}
});
}
};
/**
* 任務(wù)配置
* @return \string[][]
* @desc 每次修改 需要 重啟 workerman-queue
*/
public function jobConfig()
{
return [
['name' => self::ASYNC_JOB_QUEUE_NAME, 'handler' => AsyncJob::class],
];
}
AsyncJob excute
public function execute($queue)
{
try {
$task = $this->task;
if (isset($task['type'])) {
// 根據(jù) 任務(wù)里面的 type 獲取執(zhí)行的方法
$functionName = lcfirst(Inflector::id2camel($task['type'], '_') . 'Execute');
$log = new AsyncJobLog();
$log->execute_function = $functionName;
$log->task = json_encode($task, JSON_UNESCAPED_UNICODE);
$log->save(false);
// 偶現(xiàn) 下面的不執(zhí)行了
// 難道是 $this->$functionName($task); 要改成 $returnVal = call_user_func([$this, $functionName], $task);?
$returnVal = $this->$functionName($task);
if ($returnVal === true) {
$log->is_finish = 1;
} else {
$log->is_finish = 0;
$log->task_print = $returnVal;
}
$log->save(false);
} else {
Util::errorHandle(new \Exception('Async Job Execute:(' . json_encode($task, JSON_UNESCAPED_UNICODE) . ') type Not set!'));
}
} catch (\Exception $e) {
Util::errorHandle($e);
}
return true;
}