借助redis的發(fā)布/訂閱模式實(shí)現(xiàn)實(shí)時(shí)通信,由于進(jìn)程訂閱redis頻道后會(huì)阻塞,必須使用異步redis組件。使用之后能夠成功訂閱頻道,但是過一段時(shí)間redis鏈接就會(huì)斷開。
代碼如下:
public static function onMessage($client_id, $message)
{
echo '訂閱前收到消息:', $client_id, PHP_EOL;
self::$factory->createClient('redis://127.0.0.1:6379')->then(
function (Client $client) use ($client_id) {
$client->subscribe($client_id);
$client->on('message', function ($channel, $message){
echo 'Message on ' . $channel . ': ' . $message . PHP_EOL;
});
},
function ($exception) {
echo $exception->getMessage();
}
);
echo '訂閱后收到消息:', $client_id, PHP_EOL;
self::$loop->run();
}
public static function onWorkerStart($worker)
{
self::$loop = Worker::getEventLoop();
self::$factory = new Factory(self::$loop);
}
錯(cuò)誤信息如下:
process_timeout:
#1 /Volumes/Data/Code/WorkerFastCGI/vendor/react/event-loop/src/StreamSelectLoop.php(226): pcntl_signal_dispatch()
#2 /Volumes/Data/Code/WorkerFastCGI/vendor/react/event-loop/src/StreamSelectLoop.php(205): React\EventLoop\StreamSelectLoop->waitForStreamActivity(NULL)
#3 /Volumes/Data/Code/WorkerFastCGI/vendor/workerman/workerman/Events/React/Base.php(252): React\EventLoop\StreamSelectLoop->run()
#4 /Volumes/Data/Code/WorkerFastCGI/server/Events.php(55): Workerman\Events\React\Base->run()
#5 : Server\Events::onMessage('7f0000011b58000...', '{"msg_type":500...')
#6 /Volumes/Data/Code/WorkerFastCGI/vendor/workerman/gateway-worker/src/BusinessWorker.php(395): call_user_func('Server\\Events::...', '7f0000011b58000...', '{"msg_type":500...')
#7 : GatewayWorker\BusinessWorker->onGatewayMessage(Object(Workerman\Connection\AsyncTcpConnection), Array)
#8 /Volumes/Data/Code/WorkerFastCGI/vendor/workerman/workerman/Connection/TcpConnection.php(645): call_user_func(Array, Object(Workerman\Connection\AsyncTcpConnection), Array)
#9 : Workerman\Connection\TcpConnection->baseRead(Resource id #71)
#10 /Volumes/Data/Code/WorkerFastCGI/vendor/react/event-loop/src/StreamSelectLoop.php(238): call_user_func(Array, Resource id #71)
#11 /Volumes/Data/Code/WorkerFastCGI/vendor/react/event-loop/src/StreamSelectLoop.php(205): React\EventLoop\StreamSelectLoop->waitForStreamActivity(NULL)
#12 /Volumes/Data/Code/WorkerFastCGI/vendor/workerman/workerman/Events/React/Base.php(252): React\EventLoop\StreamSelectLoop->run()
#13 /Volumes/Data/Code/WorkerFastCGI/vendor/workerman/workerman/Events/React/Base.php(135): Workerman\Events\React\Base->run()
#14 /Volumes/Data/Code/WorkerFastCGI/vendor/workerman/workerman/Worker.php(2226): Workerman\Events\React\Base->loop()
#15 /Volumes/Data/Code/WorkerFastCGI/vendor/workerman/gateway-worker/src/BusinessWorker.php(197): Workerman\Worker->run()
#16 /Volumes/Data/Code/WorkerFastCGI/vendor/workerman/workerman/Worker.php(1352): GatewayWorker\BusinessWorker->run()
#17 /Volumes/Data/Code/WorkerFastCGI/vendor/workerman/workerman/Worker.php(1184): Workerman\Worker::forkOneWorkerForLinux(Object(GatewayWorker\BusinessWorker))
#18 /Volumes/Data/Code/WorkerFastCGI/vendor/workerman/workerman/Worker.php(1158): Workerman\Worker::forkWorkersForLinux()
#19 /Volumes/Data/Code/WorkerFastCGI/vendor/workerman/workerman/Worker.php(478): Workerman\Worker::forkWorkers()
#20 /Volumes/Data/Code/WorkerFastCGI/server.php(37): Workerman\Worker::runAll()
#21 {main}
如果是redis訂閱的話,在onWorkerStart里執(zhí)行一次訂閱就好,不必每次onMessage執(zhí)行一次訂閱。
參考 https://github.com/walkor/phpsocket.io/issues/62
非常感謝指點(diǎn),我是這樣想的:根據(jù)客戶端的請(qǐng)求動(dòng)態(tài)的訂閱和取消訂閱,不過后來仔細(xì)想了下,使用gateway的sendToGroup也能實(shí)現(xiàn)同樣的功能。不知道這兩種方式哪種更好,利用redis的發(fā)布/訂閱模式實(shí)現(xiàn)端到端的對(duì)對(duì)多通信會(huì)不會(huì)有什么問題?