国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

請問如何做到接收http請求,推送mqtt消息?

vipbressanon

我簡化了下代碼,大致如下:


define('MAX_REQUEST', 5000);

$worker = new Worker('http://0.0.0.0:端口');

$worker->onWorkerStart = function($worker)
{
    global $mqtt;

    $mqtt = new Workerman\Mqtt\Client($url, $options);
    $mqtt->onConnect = function($mqtt) {

    };
    $mqtt->connect();
};

$worker->onMessage = function(TcpConnection $connection, $args)
{
    global $mqtt;
    // 已經(jīng)處理請求數(shù)
    static $request_count = 0;

    $data = $args->post();

    // 此處報錯
    $mqtt->publish($topic, json_encode($data));

    $connection->send('Status Code: 200 OK');

    // 如果請求數(shù)達到5000
    if(++$request_count >= MAX_REQUEST)
    {
        $mqtt->close();
        Worker::stopAll();
    }
};

Worker::runAll();

業(yè)務(wù)大概是:需要一個http服務(wù)端用來接收http的請求,拿到數(shù)據(jù)經(jīng)過處理以后,在用mqtt進行推送。服務(wù)端啟動時不會報錯,但是當(dāng)進程達到設(shè)定好的5000次執(zhí)行關(guān)閉并重新開啟的時候,由于mqtt還沒連接上,就收到http請求并且執(zhí)行mqtt->publish()了,導(dǎo)致mqtt報錯,No connection to broker,請問大佬,有什么方法可以避免這個問題嗎?

1686 1 2
1個回答

walkor 打賞

收到5000請求就退出這個邏輯感覺沒什么必要。

如果想保留現(xiàn)有5000請求退出邏輯,就要做一個內(nèi)存緩存,當(dāng)mqtt沒連接成功時,收到的請求放到內(nèi)存緩存起來。當(dāng)mqtt連接上后將緩存的數(shù)據(jù)發(fā)給mqtt。

偽代碼類似

define('MAX_REQUEST', 5000);

$worker = new Worker('http://0.0.0.0:端口');

$queue = [];
$mqttConnected = false;

$worker->onWorkerStart = function($worker)
{
    global $mqtt;

    $mqtt = new Workerman\Mqtt\Client($url, $options);
    $mqtt->onConnect = function($mqtt) {
        global $mqttConnected, $queue;
        $mqttConnected = true;
        foreach ($queue as $topic => $data) {
            $mqtt->publish($topic, $data);
        }
        $queue = [];
    };
    $mqtt->connect();
};

$worker->onMessage = function(TcpConnection $connection, $args)
{
    global $mqtt, $mqttConnected, $queue;
    // 已經(jīng)處理請求數(shù)
    static $request_count = 0;

    $data = $args->post();

    // 此處報錯
    if ($mqttConnected) {
        $mqtt->publish($topic, json_encode($data));
    } else {
        $queue[$topic][] = json_encode($data);
    }

    $connection->send('Status Code: 200 OK');

    // 如果請求數(shù)達到5000
    if(++$request_count >= MAX_REQUEST)
    {
        $mqtt->close();
        Worker::stopAll();
    }
};

Worker::runAll();
  • 小W 2023-01-30

    我覺得$mqttConnected === false時,將 $connection->send放到$mqtt->onConnect 或者onError/onClose里,new Workerman\Mqtt\Client加上超時時間。這樣基本上可以做到http響應(yīng)和mqtt服務(wù)響應(yīng)同步。

  • 小W 2023-01-30

    $connection->send內(nèi)容也是根據(jù)mqtt狀態(tài)不同而不同

  • vipbressanon 2023-04-24

    大佬,我后來把5000請求去掉了,最近項目運行時遇到個問題,還是mqtt的,debug如下:
    -> Send PINGREQ package
    -- Error: Connection closed4
    -- Connection closed
    -- Reconnect after 1 seconds
    -- Error: No connection to broker5
    -- Tcp connection established
    是心跳沒收到回復(fù)然后把鏈接關(guān)掉了?但是重連以后,又報錯沒連上broker,漏發(fā)消息了,還請大佬幫忙分析解決下,感謝

  • walkor 2023-04-24

    可能是服務(wù)端沒回應(yīng)心跳連接斷開了。publish有失敗回調(diào),失敗的消息放到全局數(shù)組里,等$mqtt->onConnect觸發(fā)時再依次發(fā)送。

  • vipbressanon 2023-04-25

    感謝大佬~

年代過于久遠,無法發(fā)表回答
??