使用event+pnctl 模擬workerman寫了一個demo,但是發(fā)現(xiàn)一個問題
就是多次tcp連接之后,開4個線程,第四次連接tcp就不返回?cái)?shù)據(jù)了
開10個線程,連接斷開之后,大概7-8次就會出現(xiàn)數(shù)據(jù)不返回
<?php
class Worker{
//監(jiān)聽socket
protected $socket = NULL;
//連接事件回調(diào)
public $onConnect = NULL;
public $reusePort=1;
//接收消息事件回調(diào)
public $onMessage = NULL;
public $workerNum=10; //子進(jìn)程個數(shù)
public $allSocket; //存放所有socket
public $addr;
protected $worker_pid; //子進(jìn)程pid
protected $master_pid;//主進(jìn)程id
public function __construct($socket_address) {
//監(jiān)聽地址+端口
$this->addr=$socket_address;
$this->master_pid=posix_getpid();
}
public function start() {
//獲取配置文件
$this->fork($this->workerNum);
$this->monitorWorkers(); //監(jiān)視程序,捕獲信號,監(jiān)視worker進(jìn)程
}
public function signalHandler($sigo){
switch ($sigo){
case SIGUSR1:
$this->reload();
echo "收到重啟信號";
break;
}
}
public function fork($worker_num){
for ($i=1;$i<$worker_num;$i++){
$pid=pcntl_fork(); //創(chuàng)建成功會返回子進(jìn)程id
if($pid<0){
exit('創(chuàng)建失敗');
}else if($pid>0){
//父進(jìn)程空間,返回子進(jìn)程id
$this->worker_pid[]=$pid;
}else{ //返回為0子進(jìn)程空間
$this->accept();//子進(jìn)程負(fù)責(zé)接收客戶端請求
exit;
}
}
//放在父進(jìn)程空間,結(jié)束的子進(jìn)程信息,阻塞狀態(tài)
}
/**
* 捕獲信號
* 監(jiān)視worker進(jìn)程.拉起進(jìn)程
*/
public function monitorWorkers(){
//注冊信號事件回調(diào),是不會自動執(zhí)行的
// reload
pcntl_signal(SIGUSR1, array($this, 'signalHandler'),false); //重啟woker進(jìn)程信號
while (1){
// 當(dāng)發(fā)現(xiàn)信號隊(duì)列,一旦發(fā)現(xiàn)有信號就會觸發(fā)進(jìn)程綁定事件回調(diào)
\pcntl_signal_dispatch();
$status=0;
$pid = pcntl_wait($status,\WUNTRACED); //當(dāng)信號到達(dá)之后就會被中斷
\pcntl_signal_dispatch();
//進(jìn)程重啟的過程當(dāng)中會有新的信號過來,如果沒有調(diào)用pcntl_signal_dispatch,信號不會被處理
}
}
public function accept(){
$opts = array(
'socket' => array(
'backlog' =>10240, //成功建立socket連接的等待個數(shù)
),
);
$context = stream_context_create($opts);
//開啟多端口監(jiān)聽,并且實(shí)現(xiàn)負(fù)載均衡
stream_context_set_option($context,'socket','so_reuseport',1);
stream_context_set_option($context,'socket','so_reuseaddr',1);
$this->socket=stream_socket_server($this->addr,$errno,$errstr,STREAM_SERVER_BIND|STREAM_SERVER_LISTEN,$context);
stream_set_blocking($this->socket,false);//非阻塞
$base=new EventBase();
$ss =$this->socket;
//監(jiān)聽服務(wù)端的socket
$event=new Event($base,$ss,Event::PERSIST |Event::READ | Event::WRITE,function ($ss){
$clientSocket = stream_socket_accept($ss);
//觸發(fā)事件的連接的回調(diào)
if(!empty($clientSocket) && is_callable($this->onConnect)){
call_user_func($this->onConnect,$clientSocket);
}
$base=new EventBase();
//監(jiān)聽客戶端socket
$event=new Event($base,$clientSocket,Event::PERSIST |Event::READ | Event::WRITE,function ($clientSocket){
$buffer=fread($clientSocket,65535);
if(empty($buffer)){
if(!is_resource($clientSocket) || feof($clientSocket) ){
//觸發(fā)關(guān)閉事件
fclose($clientSocket);
}
}
//正常讀取到數(shù)據(jù),觸發(fā)消息接收事件,響應(yīng)內(nèi)容
if(!empty($buffer) && is_callable($this->onMessage)){
call_user_func($this->onMessage,$clientSocket,$buffer);
}
});
$event->add(); //加入事件監(jiān)聽
$base->loop();
});
$event->add(); //加入事件監(jiān)聽
$base->loop(); //調(diào)度掛起事件監(jiān)聽
}
/**
* 重啟worker進(jìn)程
*/
public function reload(){
foreach ($this->worker_pid as $index=>$pid){
posix_kill($pid,SIGKILL); //結(jié)束進(jìn)程
var_dump("殺掉的子進(jìn)程",$pid);
unset($this->worker_pid[$index]);
$this->fork(1); //重新拉起worker
}
}
//捕獲信號之后重啟worker進(jìn)程
}
$worker = new Worker('tcp://0.0.0.0:9800');
//開啟多進(jìn)程的端口監(jiān)聽
$worker->reusePort = true;
//連接事件
$worker->onConnect = function ($fd) {
//echo '連接事件觸發(fā)',(int)$fd,PHP_EOL;
};
$worker->onTask = function ($fd) {
//echo '連接事件觸發(fā)',(int)$fd,PHP_EOL;
};
//消息接收
$worker->onMessage = function ($conn, $message) {
//事件回調(diào)當(dāng)中寫業(yè)務(wù)邏輯
//var_dump($conn,$message);
$content=$message;
$http_resonse = "HTTP/1.1 200 OK\r\n";
$http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
$http_resonse .= "Connection: keep-alive\r\n"; //連接保持
$http_resonse .= "Server: php socket server\r\n";
$http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";
$http_resonse .= $content;
fwrite($conn, $http_resonse);
if(empty($message)){
if(!is_resource($conn) || feof($message) ){
//觸發(fā)關(guān)閉事件
fclose($conn);
}
}
};
$worker->start(); //啟動
問下是哪里出了問題