WebSocket 當(dāng)前連接收到了不屬于它的訂閱數(shù)據(jù),新手,不知道是變量污染還是其他問(wèn)題,求兄弟們幫忙看一下。
protected $socket = 'websocket://0.0.0.0:8090';
protected $clients = [];
/**
* 連接成功事件
* @param $connection
*/
public function onConnect($connection)
{
$connection->id = $this->getUniqueConnectionId($connection);
// 為每個(gè)連接初始化一個(gè)訂閱頻道的數(shù)組
$this->clients[$connection->id] = [
'connection' => $connection,
'subscriptions' => [], // 存儲(chǔ)訂閱的頻道信息
'timerId' => ''
];
}
/**
* 接收消息事件
* @param $connection
* @param $data
*/
public function onMessage($connection, $data)
{
// 解析接收到的數(shù)據(jù)
$message = json_decode($data, true);
// 檢查解析結(jié)果是否為null,即是否解析失敗
if ($message === null) {
$connection->send('Failed to decode JSON data.'); // 發(fā)送解析失敗響應(yīng)
return;
}
if (!isset($message['type'])) {
return;
}
$queryId = Random::uuid();
$queryParam['symbol_id'] = $message['symbol_id'] ?? null;
$queryParam['channel'] = $message['channel'] ?? null;
// 現(xiàn)在 $message 包含解析后的數(shù)組數(shù)據(jù),您可以繼續(xù)處理它
switch ($message['type']) {
case 'subscribe':
// 訂閱消息
$this->subscribe($connection, $queryParam);
break;
case 'unsubscribe':
// 取消訂閱消息
$this->unsubscribe($connection, $queryParam);
break;
case 'pong':
// 收到客戶(hù)端的pong響應(yīng)
break;
default:
// 其他消息類(lèi)型,可以根據(jù)需求進(jìn)行處理
break;
}
}
// 加入頻道
protected function subscribe($connection, $queryParam)
{
// 停止之前的定時(shí)器
$this->clearTimer($connection->id);
// 添加訂閱信息到連接中
$this->clients[$connection->id]['subscriptions'][] = $queryParam;
// 創(chuàng)建一個(gè)5秒后發(fā)送數(shù)據(jù)的定時(shí)器
$timerId = Timer::add(5, function () use ($connection, $queryParam) {
// 獲取當(dāng)前連接的訂閱信息
$subscriptions = $this->clients[$connection->id]['subscriptions'];
// 檢查當(dāng)前連接是否訂閱了當(dāng)前頻道
$isSubscribed = false;
foreach ($subscriptions as $subscription) {
if ($subscription['channel'] === $queryParam['channel'] && $subscription['symbol_id'] === $queryParam['symbol_id']) {
$isSubscribed = true;
break;
}
}
if ($isSubscribed) {
// 發(fā)送數(shù)據(jù)給當(dāng)前連接
$queryResult = $this->getDataFromCache($queryParam['channel'], $queryParam['symbol_id']);
if (!empty($queryResult)) {
$connection->clearBuffer();
$connection->send(json_encode($queryResult));
}
}
}, [], true);
// 記錄定時(shí)器ID
$this->clients[$connection->id]['timerId'] = $timerId;
}
// 檢查連接是否訂閱了指定的頻道
protected function isSubscribed($connection, $channel, $symbol_id)
{
$subscriptions = $this->clients[$connection->id]['subscriptions'];
foreach ($subscriptions as $subscription) {
if ($subscription['channel'] === $channel && $subscription['symbol_id'] === $symbol_id) {
return true;
}
}
return false;
}
// 取消頻道時(shí)移除連接及其訂閱信息
protected function unsubscribe($connection, $queryParam)
{
$subscriptions = &$this->clients[$connection->id]['subscriptions'];
foreach ($subscriptions as $key => $subscription) {
if ($subscription['channel'] === $queryParam['channel'] && $subscription['symbol_id'] === $queryParam['symbol_id']) {
unset($subscriptions[$key]);
break;
}
}
}
/**
* 心跳事件
* @param $connection
*/
public function onHeartbeat($connection)
{
// 可以在這里處理心跳邏輯,例如檢查連接是否存活等
}
/**
* 連接關(guān)閉事件
* @param $connection
*/
public function onClose($connection)
{
$this->clearTimer($connection->id);
unset($this->clients[$connection->id]);
}
/**
* 清除連接的定時(shí)器
* @param $connectionId
*/
protected function clearTimer($connectionId)
{
if (isset($this->clients[$connectionId]['timerId'])) {
Timer::del($this->clients[$connectionId]['timerId']);
}
}
/**
* 獲取唯一的連接ID,可以添加worker ID前綴
* @param $connection
* @return string
*/
protected function getUniqueConnectionId($connection)
{
return $connection->worker->id . '_' . $connection->id;
}
// 從緩存中獲取數(shù)據(jù)
protected function getDataFromCache($channel, $symbol_id)
{
}
}
<?php
namespace app\process;
use Workerman\Timer;
use think\facade\Db;
use Workerman\Protocols\Ws;
use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;
class TaskDev
{
public function onWorkerStart()
{
echo '監(jiān)聽(tīng)/n';
// 以websocket協(xié)議連接遠(yuǎn)程websocket服務(wù)器
$ws_connection = new AsyncTcpConnection("ws://xxx.xxx.xxx.xxx:xx/xxxxxx");
// 每隔55秒向服務(wù)端發(fā)送一個(gè)opcode為0x9的websocket心跳
$ws_connection->websocketPingInterval = 55;
// 自定義http頭
$ws_connection->headers = [];
// 設(shè)置數(shù)據(jù)類(lèi)型,默認(rèn)BINARY_TYPE_BLOB為文本
$ws_connection->websocketType = Ws::BINARY_TYPE_BLOB; // BINARY_TYPE_BLOB為文本 BINARY_TYPE_ARRAYBUFFER為二進(jìn)制
// 當(dāng)TCP完成三次握手后
$ws_connection->onConnect = function($connection){
// echo "tcp connected\n";
};
// 當(dāng)websocket完成握手后
$ws_connection->onWebSocketConnect = function(AsyncTcpConnection $con, $response) {
echo "握手";
echo $response;
$data = [
"params"=>[],
"time"=>10
];
$con->send(json_encode($data));
};
// 遠(yuǎn)程websocket服務(wù)器發(fā)來(lái)消息時(shí)
$ws_connection->onMessage = function($connection, $data){
echo $data;
$res_data = [];
$devlist = json_decode($data,true);
foreach ($devlist as $key => $value) {
}
};
// 連接上發(fā)生錯(cuò)誤時(shí),一般是連接遠(yuǎn)程websocket服務(wù)器失敗錯(cuò)誤
$ws_connection->onError = function($connection, $code, $msg){
echo "error: $msg\n";
};
// 當(dāng)連接遠(yuǎn)程websocket服務(wù)器的連接斷開(kāi)時(shí)
$ws_connection->onClose = function($connection){
echo "connection closed and try to reconnect\n";
// 如果連接斷開(kāi),1秒后重連
$connection->reConnect(1);
};
// 設(shè)置好以上各種回調(diào)后,執(zhí)行連接操作
$ws_connection->connect();
}
}