我簡化了下代碼,大致如下:
define('MAX_REQUEST', 5000);
$worker = new Worker('http://0.0.0.0:端口');
$worker->onWorkerStart = function($worker)
{
global $mqtt;
$mqtt = new Workerman\Mqtt\Client($url, $options);
$mqtt->onConnect = function($mqtt) {
};
$mqtt->connect();
};
$worker->onMessage = function(TcpConnection $connection, $args)
{
global $mqtt;
// 已經(jīng)處理請求數(shù)
static $request_count = 0;
$data = $args->post();
// 此處報錯
$mqtt->publish($topic, json_encode($data));
$connection->send('Status Code: 200 OK');
// 如果請求數(shù)達到5000
if(++$request_count >= MAX_REQUEST)
{
$mqtt->close();
Worker::stopAll();
}
};
Worker::runAll();
業(yè)務(wù)大概是:需要一個http服務(wù)端用來接收http的請求,拿到數(shù)據(jù)經(jīng)過處理以后,在用mqtt進行推送。服務(wù)端啟動時不會報錯,但是當(dāng)進程達到設(shè)定好的5000次執(zhí)行關(guān)閉并重新開啟的時候,由于mqtt還沒連接上,就收到http請求并且執(zhí)行mqtt->publish()了,導(dǎo)致mqtt報錯,No connection to broker,請問大佬,有什么方法可以避免這個問題嗎?
收到5000請求就退出這個邏輯感覺沒什么必要。
如果想保留現(xiàn)有5000請求退出邏輯,就要做一個內(nèi)存緩存,當(dāng)mqtt沒連接成功時,收到的請求放到內(nèi)存緩存起來。當(dāng)mqtt連接上后將緩存的數(shù)據(jù)發(fā)給mqtt。
偽代碼類似
define('MAX_REQUEST', 5000);
$worker = new Worker('http://0.0.0.0:端口');
$queue = [];
$mqttConnected = false;
$worker->onWorkerStart = function($worker)
{
global $mqtt;
$mqtt = new Workerman\Mqtt\Client($url, $options);
$mqtt->onConnect = function($mqtt) {
global $mqttConnected, $queue;
$mqttConnected = true;
foreach ($queue as $topic => $data) {
$mqtt->publish($topic, $data);
}
$queue = [];
};
$mqtt->connect();
};
$worker->onMessage = function(TcpConnection $connection, $args)
{
global $mqtt, $mqttConnected, $queue;
// 已經(jīng)處理請求數(shù)
static $request_count = 0;
$data = $args->post();
// 此處報錯
if ($mqttConnected) {
$mqtt->publish($topic, json_encode($data));
} else {
$queue[$topic][] = json_encode($data);
}
$connection->send('Status Code: 200 OK');
// 如果請求數(shù)達到5000
if(++$request_count >= MAX_REQUEST)
{
$mqtt->close();
Worker::stopAll();
}
};
Worker::runAll();
我覺得$mqttConnected === false時,將 $connection->send放到$mqtt->onConnect 或者onError/onClose里,new Workerman\Mqtt\Client加上超時時間。這樣基本上可以做到http響應(yīng)和mqtt服務(wù)響應(yīng)同步。
大佬,我后來把5000請求去掉了,最近項目運行時遇到個問題,還是mqtt的,debug如下:
-> Send PINGREQ package
-- Error: Connection closed4
-- Connection closed
-- Reconnect after 1 seconds
-- Error: No connection to broker5
-- Tcp connection established
是心跳沒收到回復(fù)然后把鏈接關(guān)掉了?但是重連以后,又報錯沒連上broker,漏發(fā)消息了,還請大佬幫忙分析解決下,感謝
可能是服務(wù)端沒回應(yīng)心跳連接斷開了。publish有失敗回調(diào),失敗的消息放到全局數(shù)組里,等$mqtt->onConnect觸發(fā)時再依次發(fā)送。