我在使用該項目過程中,曾出現(xiàn)過timer無限制遞增的情況,也出現(xiàn)過服務(wù)端主動踢出連接無法消費等問題,遂自己重寫了一個amqp客戶端;
后來我回過頭觀察分析workerman/rabbitmq源碼的時候,發(fā)現(xiàn)了一些可以被建議的地方:
Client.php 160 - 170 行位置已經(jīng)創(chuàng)建了一個持續(xù)的定時器
})->then(function () {
$this->heartbeatTimer = Timer::add($this->options["heartbeat"], [$this, "onHeartbeat"], null, true);
$this->state = ClientStateEnum::CONNECTED;
return $this;
});
Client.php 226 - 243 行位置依然在反復(fù)創(chuàng)建一次性的定時器
public function onHeartbeat()
{
$now = microtime(true);
$nextHeartbeat = ($this->lastWrite ?: $now) + $this->options["heartbeat"];
if ($now >= $nextHeartbeat) {
$this->writer->appendFrame(new HeartbeatFrame(), $this->writeBuffer);
$this->flushWriteBuffer()->done(function () {
$this->heartbeatTimer = Timer::add($this->options["heartbeat"], [$this, "onHeartbeat"], null, false);
});
if (is_callable($this->options['heartbeat_callback'] ?? null)) {
$this->options['heartbeat_callback']->call($this);
}
} else {
$this->heartbeatTimer = Timer::add($nextHeartbeat - $now, [$this, "onHeartbeat"], null, false);
}
}
這里我理解是為了判斷時間差,更精確的進行觸發(fā)回調(diào),但是我個人認為本身workerman的定時器是精確到毫秒的,這里沒有必要再做冗余的判斷,直接調(diào)用即可,代碼如下:
public function onHeartbeat(): void
{
$this->writer->appendFrame(new HeartbeatFrame(), $this->writeBuffer);
$this->flushWriteBuffer()->then(
function () {
if (is_callable(
isset($this->options['heartbeat_callback'])
? $this->options['heartbeat_callback']
: null
)) {
// 這里我并沒有沿用bunny的回調(diào)觸發(fā)方式,而是自己寫了一個,如有需要,將該方法改回bunny方法就好了
($this->options['heartbeat_callback'])($this);
// $this->options['heartbeat_callback']->call($this);
}
},
function (\Throwable $throwable){
if($this->log){
$this->log->notice(
'OnHeartbeatFailed',
[
$throwable->getMessage(),
$throwable->getCode(),
$throwable->getFile(),
$throwable->getLine()
]
);
}
AbstractProcess::kill("OnHeartbeatFailed-{$throwable->getMessage()}");
});
}
AbstractProcess::kill()這個方法實際上是一個簡單的殺死當前進程的方法,因為bunny的客戶端使用的是異步promise的執(zhí)行方式,在遇到錯誤的時候會調(diào)用then中的onRejected,我認為,在一定情況下如果心跳失敗了,會影響當前鏈接的活性,隨之會被服務(wù)端踢出,但客戶端并沒有完善的重連機制,就造成了假死,所以我在這個位置加入殺死當前進程的方法,讓workerman的主進程重新拉起一個進程,該進程也會重新連接,重新處理和消費,不會影響工作流,kill方法的代碼如下:
public static function kill(?string $log = null)
{
if(self::$_masterPid === ($pid = posix_getpid())){
self::stopAll(SIGKILL, $log);
}else{
self::log("(pid:{$pid}) {$log}");
posix_kill($pid, SIGKILL);
}
}
如上述所說的,在Client.php 182 - 220的disconnect方法中也沒有重連的方案,在使用rabbitmq的管理后臺將該鏈接斷開后,該進程就始終保持了一個僵尸進程的角色,無法退出也無法消費,所以我建議改進如下:
public function disconnect($replyCode = 0, $replyText = '') : Promise\PromiseInterface
{
if ($this->state === ClientStateEnum::DISCONNECTING) {
return $this->disconnectPromise;
}
if ($this->state !== ClientStateEnum::CONNECTED) {
return Promise\reject(new ClientException("Client is not connected."));
}
$this->state = ClientStateEnum::DISCONNECTING;
$promises = [];
if ($replyCode === 0) {
foreach ($this->channels as $channel) {
$promises[] = $channel->close($replyCode, $replyText);
}
}
else{
foreach($this->channels as $channel){
$this->removeChannel($channel->getChannelId());
}
}
if ($this->heartbeatTimer) {
Timer::del($this->heartbeatTimer);
$this->heartbeatTimer = null;
}
return $this->disconnectPromise = Promise\all($promises)->then(function () use ($replyCode, $replyText) {
if (!empty($this->channels)) {
throw new \LogicException("All channels have to be closed by now.");
}
if($replyCode !== 0){
return null;
}
return $this->connectionClose($replyCode, $replyText, 0, 0);
})->then(function () use ($replyCode, $replyText){
$this->eventLoop->del($this->getStream(), EventInterface::EV_READ);
$this->closeStream();
$this->init();
if($replyCode !== 0){
// 殺死當前進程,交主進程重啟
AbstractProcess::kill("{$replyCode}-{$replyText}");
}
return $this;
});
}
最后,我是非常喜歡workman及相關(guān)的生態(tài)組件的,本意是想直接使用workerman生態(tài)相關(guān)的組件,但當時我所處的項目上線非常急迫,所以拋開了workerman/rabbitmq自行寫了一套casual/amqp,我個人希望workerman能夠越來越好,加油;
casual/amqp項目地址:https://github.com/chaz6chez/simple-amqp
非常感謝你的建議!
如果可以的話,請給 workerman/rabbitmq
發(fā)個pr。
業(yè)務(wù)代碼都是運行在子進程,所以不用判斷master_pid,workerman重啟當前進程直接調(diào)用 Worker::stopAll();
就行,也可以用 posix_kill(posix_getpid(), SIGINT);
我打算來長期幫忙維護workerman/rabbitmq這個項目了,我把rabbitmq引入我的casual/amqp這個項目使用了,目前是在生產(chǎn)環(huán)境運行的
接上述,如果Workerman能夠提供一個子進程重啟的命令或者方法的話,是極好的,目前我是用AbstractProcess繼承Worker,然后增加的一個kill方法,但是也是使用了KILL的信號,不知道有沒有其他更優(yōu)雅的信號或者方法可以使用呢?