21年的時候有機(jī)會第一次接觸了webman,初次接觸這種模式的框架時,感覺還是挺新穎的。目前已經(jīng)使用了1年多,感覺挺不錯的。之前在論壇看到了第三方項目中對PHPForker的介紹,于是有機(jī)會學(xué)習(xí)了一下,并重新回過頭來看webman的啟動流程。
以下流程圖是個人理解,有不對的地方請指出來,我將修正它。
同時聲明下該流程的幾個點(diǎn)
如果圖片查看不完整的話,看這里img
<?php
declare(strict_types=1);
namespace Stream;
class Timer
{
/**
* @var SplPriorityQueue
*/
private static SplPriorityQueue $queue;
public function __construct()
{
// 初始化優(yōu)先隊列
self::$queue = new SplPriorityQueue();
// 定義extra
self::$queue->setExtractFlags(SplPriorityQueue::EXTR_BOTH);
}
public function init(): void
{
// 注冊信號
pcntl_signal(SIGALRM, [$this, 'handler'], false);
}
/**
* @param callable $func 回調(diào)函數(shù)
* @param float $interval
* @return void
*/
public function addTimer(callable $func, float $interval): void
{
// 如果不存在任務(wù),則創(chuàng)建一個信號
if (self::$queue->count() === 0) {
pcntl_alarm(1);
}
$now = hrtime(true) / 1e-9;
$nextRunTime = $now + $interval;
self::$queue->insert(
[
'interval' => $interval,
'func' => $func
],
-$nextRunTime
);
$count = self::$queue->count();
printf('queue count:%d' . PHP_EOL, $count);
}
public function handler(): void
{
pcntl_alarm(1);
$this->tick();
}
public function tick(): void
{
$count = self::$queue->count();
$now = hrtime(true) / 1e-9;
while ($count--) {
$data = self::$queue->top();
$runTime = -$data['priority'];
if ($runTime <= $now) {
self::$queue->extract();
call_user_func($data['data']['func']);
$this->addTimer($data['data']['func'], $data['data']['interval']);
}
}
}
}
class Server
{
private string $server = 'tcp://127.0.0.1:5501';
public function listenSelect(): void
{
$mainSocket = stream_socket_server($this->server, $errorCode, $errorMsg);
stream_set_blocking($mainSocket, false);
$read = [];
$write = $except = null;
$read[$this->server] = $mainSocket;
while (true) {
// 調(diào)用等待信號的處理器,Timer部分
pcntl_signal_dispatch();
$tmpRead = $read;
$tmpWrite = $write;
try {
$select = stream_select($tmpRead, $tmpWrite, $except, 1, 0);
} catch (\Throwable $e) {
printf($e->getMessage() . PHP_EOL);
continue;
}
if ($select === false) {
continue;
}
foreach ($tmpRead as $sid => $socket) {
if ($socket === $mainSocket) {
// 說明有新的鏈接進(jìn)入
$newSocket = stream_socket_accept($socket, 0, $newSocketPeer);
if ($newSocket === false) {
print '接受connection失?。? . $newSocketPeer . PHP_EOL;
continue;
}
$socketAddress = 'tcp://' . stream_socket_get_name($newSocket, true);
$read[$socketAddress] = $newSocket;
print '接受connection成功:' . $newSocketPeer . PHP_EOL;
}
else {
// 從客戶端讀取數(shù)據(jù),如何確定是當(dāng)前這個client發(fā)來的數(shù)據(jù)?
// 因為select拿到數(shù)據(jù),說明一定是有新的數(shù)據(jù)被讀到(返回的read是有新數(shù)據(jù)到達(dá)的socket)
$msg = fread($socket, 65535);
if ($msg === '' || $msg === false) {
// 移除read
foreach ($read as $k => $v) {
if ($socket === $v) {
unset($read[$k]);
}
}
fclose($socket);
print '關(guān)閉connection:' . $sid . PHP_EOL;
} else {
// 打印信息,并寫回
print '收到connection消息:' . $msg . PHP_EOL;
if (in_array($socket, $read, true)) {
fwrite($socket, '已收到信息,' . date('H:i:s') . PHP_EOL);
}
}
}
}
// usleep(100000);
}
}
}
$timer = new Timer();
$timer->addTimer(
static function() {
$time = time();
printf('this is timer: hello world: %d' . PHP_EOL, $time);
},
1
);
$timer->init();
(new Server())->listenSelect();
起初在本地測試的過程中,針對stream_select(&$read, &$write, &$except, $tv_sec, $tv_usec),我學(xué)著的PHPForker的做法,將新連接進(jìn)來的socket不僅放入$read中,同時也放入$write中,這樣一旦啟動之后新連接進(jìn)入后,原本的stream_select應(yīng)該阻塞直到超時的,卻并沒有被阻塞。通過debug調(diào)試發(fā)現(xiàn),這時每次select都有write返回,也就是該新加進(jìn)去的socket。
這里起初,我針對stream_select的什么時候返回的理解,除了超時場景下返回外,另外的就是監(jiān)聽的$read、$write、$except的文件描述符有新變化才返回。所以當(dāng)?shù)谝淮螌阎鲃颖O(jiān)聽的socket放進(jìn)到read時(這里后面統(tǒng)一稱為mainSocket),如果此時有客戶端連接進(jìn)來,那么下一次在select監(jiān)聽的時候,就會發(fā)現(xiàn)read中的mainSocket有就緒的狀態(tài),所以這時我們將受理連接,也就是通過stream_socket_accept去接收,得到一個客戶端的socket。如果此時將該socket不僅放入$read去監(jiān)聽,同時也放到$write去監(jiān)聽,那接下來每次select的時候,都會立即返回。為什么呢?
后面我翻到了有其他人也有同樣的疑惑http://wtbis.cn/q/1307,看到這個我才大概明白。前面放進(jìn)去的$write一直處于可寫狀態(tài),所以每次select都能拿到。
這里找到了幾篇解說挺好的文章,可以參考下,它有一個系列PHP socket初探,PHP Socket初探—-先從一個簡單的socket服務(wù)器開始
1、php socket通信中stream_select方法的理解
2、stream_select ($read, $write, $except, $timeout ); 函數(shù)問題
3、PHP Socket初探—-先從一個簡單的socket服務(wù)器開始
原文見這里
"PHP Socket初探—-先從一個簡單的socket服務(wù)器開始":有沒完整的php代碼?
<?php
class SocketServer
{
public $socket;
public $all_sockets = [];
public function __construct(string $address)
{
//創(chuàng)建監(jiān)聽socket服務(wù) 資源句柄 ,,resource|false
$this->socket = \stream_socket_server($address, $php_errorcode, $php_errormsg);
if ($this->socket == false) {
throw new \Exception('創(chuàng)建失??!', $php_errorcode, $php_errormsg);
}
\stream_set_blocking($this->socket, false);
//當(dāng)前初始化的第一個socket服務(wù) 資源句柄放入 資源池
$this->all_sockets[intval($this->socket)] = $this->socket;
echo ('=== 服務(wù)器啟動|' . $address.PHP_EOL);
}
public function run()
{
//死循環(huán)
while (true) {
$write=$except=null;
$allSocket=$this->all_sockets;
\stream_select($allSocket, $write, $except, 60);
foreach ($allSocket as $index => $socket) {
//如果 streamsocket 服務(wù)句柄 == 當(dāng)前socket資源池的id,說明有新連接
if ($this->socket === $socket) {
// 接受由 stream_socket_server() 創(chuàng)建的套接字連接 timeout/覆蓋默認(rèn)的套接字接受的超時時限。輸入的時間需以秒為單位
// resource|false
$new_conn_socket = \stream_socket_accept($this->socket);
if ($new_conn_socket == false) {
//可能是建立連接異常
continue;
}
$this->onConn($new_conn_socket);
$this->all_sockets[intval($new_conn_socket)] = $new_conn_socket;
echo '=== 新連接建立'.(int)($new_conn_socket).PHP_EOL;
} else {
//如果不是新的連接,則看看有無數(shù)據(jù)過來,讀取長度 65536
$buff = fread($socket, 0xFFFF);
// 不能是 == ''
if ($buff === '' || $buff === false) {
//客戶端已斷開
echo '=== 斷開連接'.intval($socket).PHP_EOL;
$this->onClose($socket);
unset($this->all_sockets[intval($socket)]);
fclose($socket);
continue;
}
//處理發(fā)來的數(shù)據(jù)
$this->onMessage($socket, $buff);
}
}
}
}
public function onConn(mixed $socket)
{
}
public function onClose(mixed $socket)
{
}
public function onMessage(mixed $socket, $buff)
{
$body = 'hello word';
$header = [
'HTTP/1.1 200 OK',
'Connection: keep-alive',
'Niubi: test123',
'Content-length:' . strlen($body)
];
$header_string = \implode(chr(0x0D) . chr(0x0A), $header);
$data = $header_string . chr(0x0D) . chr(0x0A) . chr(0x0D) . chr(0x0A) . $body;
// echo '用戶數(shù)據(jù):'.$buff.PHP_EOL;
fwrite($socket, $data);
unset($this->all_sockets[intval($socket)]);
fclose($socket);
}
}
$a = new SocketServer('tcp://0.0.0.0:88');
$a->run();;
很少有這么底層的好文了