国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

WebSocket 當(dāng)前連接收到了不屬于它的訂閱數(shù)據(jù)

程序玩

問(wèn)題描述

WebSocket 當(dāng)前連接收到了不屬于它的訂閱數(shù)據(jù),新手,不知道是變量污染還是其他問(wèn)題,求兄弟們幫忙看一下。

為此你搜索到了哪些方案及不適用的原因

protected $socket = 'websocket://0.0.0.0:8090';

protected $clients = [];

/**
 * 連接成功事件
 * @param $connection
 */
public function onConnect($connection)
{
    $connection->id = $this->getUniqueConnectionId($connection);
    // 為每個(gè)連接初始化一個(gè)訂閱頻道的數(shù)組
    $this->clients[$connection->id] = [
        'connection' => $connection,
        'subscriptions' => [], // 存儲(chǔ)訂閱的頻道信息
        'timerId' => ''
    ];
}

/**
 * 接收消息事件
 * @param $connection
 * @param $data
 */
public function onMessage($connection, $data)
{
    // 解析接收到的數(shù)據(jù)
    $message = json_decode($data, true);

    // 檢查解析結(jié)果是否為null,即是否解析失敗
    if ($message === null) {
        $connection->send('Failed to decode JSON data.'); // 發(fā)送解析失敗響應(yīng)
        return;
    }

    if (!isset($message['type'])) {
        return;
    }

    $queryId = Random::uuid();
    $queryParam['symbol_id'] = $message['symbol_id'] ?? null;
    $queryParam['channel']   = $message['channel'] ?? null;

    // 現(xiàn)在 $message 包含解析后的數(shù)組數(shù)據(jù),您可以繼續(xù)處理它
    switch ($message['type']) {
        case 'subscribe':
            // 訂閱消息
            $this->subscribe($connection, $queryParam);
            break;
        case 'unsubscribe':
            // 取消訂閱消息
            $this->unsubscribe($connection, $queryParam);
            break;
        case 'pong':
            // 收到客戶(hù)端的pong響應(yīng)
            break;
        default:
            // 其他消息類(lèi)型,可以根據(jù)需求進(jìn)行處理
            break;
    }

}

// 加入頻道
protected function subscribe($connection, $queryParam)
{
    // 停止之前的定時(shí)器
    $this->clearTimer($connection->id);
    // 添加訂閱信息到連接中
    $this->clients[$connection->id]['subscriptions'][] = $queryParam;

    // 創(chuàng)建一個(gè)5秒后發(fā)送數(shù)據(jù)的定時(shí)器
    $timerId = Timer::add(5, function () use ($connection, $queryParam) {
        // 獲取當(dāng)前連接的訂閱信息
        $subscriptions = $this->clients[$connection->id]['subscriptions'];

        // 檢查當(dāng)前連接是否訂閱了當(dāng)前頻道
        $isSubscribed = false;
        foreach ($subscriptions as $subscription) {
            if ($subscription['channel'] === $queryParam['channel'] && $subscription['symbol_id'] === $queryParam['symbol_id']) {
                $isSubscribed = true;
                break;
            }
        }

        if ($isSubscribed) {
            // 發(fā)送數(shù)據(jù)給當(dāng)前連接
            $queryResult = $this->getDataFromCache($queryParam['channel'], $queryParam['symbol_id']);
            if (!empty($queryResult)) {
                $connection->clearBuffer();
                $connection->send(json_encode($queryResult));
            }
        }
    }, [], true);

    // 記錄定時(shí)器ID
    $this->clients[$connection->id]['timerId'] = $timerId;
}

// 檢查連接是否訂閱了指定的頻道
protected function isSubscribed($connection, $channel, $symbol_id)
{
    $subscriptions = $this->clients[$connection->id]['subscriptions'];
    foreach ($subscriptions as $subscription) {
        if ($subscription['channel'] === $channel && $subscription['symbol_id'] === $symbol_id) {
            return true;
        }
    }
    return false;
}

// 取消頻道時(shí)移除連接及其訂閱信息
protected function unsubscribe($connection, $queryParam)
{
    $subscriptions = &$this->clients[$connection->id]['subscriptions'];
    foreach ($subscriptions as $key => $subscription) {
        if ($subscription['channel'] === $queryParam['channel'] && $subscription['symbol_id'] === $queryParam['symbol_id']) {
            unset($subscriptions[$key]);
            break;
        }
    }
}

/**
 * 心跳事件
 * @param $connection
 */
public function onHeartbeat($connection)
{
    // 可以在這里處理心跳邏輯,例如檢查連接是否存活等
}

/**
 * 連接關(guān)閉事件
 * @param $connection
 */
public function onClose($connection)
{
    $this->clearTimer($connection->id);
    unset($this->clients[$connection->id]);
}

/**
 * 清除連接的定時(shí)器
 * @param $connectionId
 */
protected function clearTimer($connectionId)
{
    if (isset($this->clients[$connectionId]['timerId'])) {
        Timer::del($this->clients[$connectionId]['timerId']);
    }
}

/**
 * 獲取唯一的連接ID,可以添加worker ID前綴
 * @param $connection
 * @return string
 */
protected function getUniqueConnectionId($connection)
{
    return $connection->worker->id . '_' . $connection->id;
}

// 從緩存中獲取數(shù)據(jù)
protected function getDataFromCache($channel, $symbol_id)
{

}

}

815 1 0
1個(gè)回答

不抽煙
<?php
namespace app\process;

use Workerman\Timer;

use think\facade\Db;

use Workerman\Protocols\Ws;
use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;

class TaskDev
{

    public function onWorkerStart()
    {

        echo '監(jiān)聽(tīng)/n';
        // 以websocket協(xié)議連接遠(yuǎn)程websocket服務(wù)器
        $ws_connection = new AsyncTcpConnection("ws://xxx.xxx.xxx.xxx:xx/xxxxxx");
        // 每隔55秒向服務(wù)端發(fā)送一個(gè)opcode為0x9的websocket心跳
        $ws_connection->websocketPingInterval = 55;
        // 自定義http頭
        $ws_connection->headers = [];
        // 設(shè)置數(shù)據(jù)類(lèi)型,默認(rèn)BINARY_TYPE_BLOB為文本
        $ws_connection->websocketType = Ws::BINARY_TYPE_BLOB; // BINARY_TYPE_BLOB為文本 BINARY_TYPE_ARRAYBUFFER為二進(jìn)制
        // 當(dāng)TCP完成三次握手后
        $ws_connection->onConnect = function($connection){
            // echo "tcp connected\n";
        };
        // 當(dāng)websocket完成握手后
        $ws_connection->onWebSocketConnect = function(AsyncTcpConnection $con, $response) {
            echo "握手";
            echo $response;

            $data = [
                "params"=>[],
                "time"=>10
            ];
            $con->send(json_encode($data));

        };
        // 遠(yuǎn)程websocket服務(wù)器發(fā)來(lái)消息時(shí)
        $ws_connection->onMessage = function($connection, $data){
            echo $data;
            $res_data = [];
            $devlist = json_decode($data,true);

            foreach ($devlist as $key => $value) {

            }

        };
        // 連接上發(fā)生錯(cuò)誤時(shí),一般是連接遠(yuǎn)程websocket服務(wù)器失敗錯(cuò)誤
        $ws_connection->onError = function($connection, $code, $msg){
            echo "error: $msg\n";
        };
        // 當(dāng)連接遠(yuǎn)程websocket服務(wù)器的連接斷開(kāi)時(shí)
        $ws_connection->onClose = function($connection){
            echo "connection closed and try to reconnect\n";
            // 如果連接斷開(kāi),1秒后重連
            $connection->reConnect(1);
        };
        // 設(shè)置好以上各種回調(diào)后,執(zhí)行連接操作
        $ws_connection->connect();

    }

}
年代過(guò)于久遠(yuǎn),無(wú)法發(fā)表回答
??