new Server();//Channel的服務(wù)器
$worker1 = new Worker();
$worker1->onStart=function(){
Client::on(xxx,function($msg){
echo "這里不能打印";
}
}
$worker2 = new Worker();
$worker2->onStart = function(){
Client::publish(xxx,data);//這里publish。
}
Worker::runAll();
Client.php
onRemoteMessage 打印,沒(méi)有打印到publish的內(nèi)容
2個(gè)worker間題。
同個(gè)worker內(nèi)正常。
真實(shí)環(huán)境是:
worker2用來(lái)
$redis->subscribe(xxx,function($instrance, $channel, $msg){
Client::publish($channel, $msg);
}
worker1用
Client::on($channel, function($msg){
// 來(lái)處理。因?yàn)閞edis->subscribe是阻塞的,不能在worker1內(nèi)用。worker1要處理其它業(yè)務(wù)的。不能阻塞
}
不能在onWorkerStart里立刻publish,因?yàn)榱硗獾倪M(jìn)程可能還沒(méi)有運(yùn)行到onWorkerStart,
可能還沒(méi)運(yùn)行到Client::on(xxx, ...)
另外兩個(gè)worker要在onWorkerStart里調(diào)用下Client::connect('channel_server_ip')下,
1:我看Client::on和Client::publish內(nèi)部實(shí)現(xiàn)都有先調(diào)用Client::connect();//我的Server,ip和port都是默認(rèn)的,Client::connect的ip和port也是默認(rèn)的。這看來(lái)去應(yīng)該是正常的。
2:不能在onWorkerStart里立刻$redis->publish( Client::publish),這個(gè)邏輯我這邊業(yè)務(wù)是允許的。
現(xiàn)在的情況是,worker2->onWorkerStart里$redis->subscribe('xx', function($msg){
Client::publish(xxxx,$msg);//這里要給$worker1->onWorerStart里的Client::on(xxxx,這里處理);
})
看Client::publish是收到打印了。但Server::這邊是沒(méi)收到。Client::onRemoteMessage這里也沒(méi)收到Client::publish發(fā)來(lái)的數(shù)據(jù)。
我請(qǐng)問(wèn)一下。
當(dāng)onWorkerStart里使用了redis->subscribe,會(huì)不會(huì)影響 這個(gè)回調(diào)里的Client::publish 發(fā)送數(shù)據(jù)到Server
onWorkStart不能redis-subscribe(會(huì)導(dǎo)致Client::publish發(fā)不出數(shù)據(jù)),有何辦法解決這個(gè)問(wèn)題嗎?
最簡(jiǎn)單的方法是不直接調(diào)用Channel/Client::publish,因?yàn)镃hannel/Client是異步非阻塞的,你的redis-subscribe會(huì)阻塞整個(gè)進(jìn)程,導(dǎo)致Channel/Client無(wú)法處理異步數(shù)據(jù)。你可以把Channel/Client::publish部分代碼抽離出來(lái),手寫(xiě)代碼用阻塞的方式發(fā)送。
你可以看下Client.php publish方法的代碼,改為用阻塞的方式publish事件。
代碼類(lèi)似
<?php
function publish($events, $data)
{
$client = stream_socket_client('tcp://channel_server_ip:channel_server_port');
$buffer = serialize(array('type' => 'publish', 'channels'=>(array)$events, 'data' => $data));
// frame協(xié)議格式,頭部4個(gè)字節(jié)是包的長(zhǎng)度
$all_buffer = pack('N', strlen($buffer)+4).$buffer;
fwrite($client, $all_buffer);
}