?? 最新更新于2020-06-02
最早接觸reactor模型的時(shí)候,應(yīng)該是在參與一個(gè)叫zanphp項(xiàng)目的時(shí)候,他是一個(gè)類似swoole的php拓展項(xiàng)目,當(dāng)然它們之間的故事我就不多說了,也有一些沖突和迷茫;在那個(gè)時(shí)間段的PHP發(fā)展還是很蓬勃向上的,那時(shí)候的滴滴、有贊、百度都有很多很多PHP項(xiàng)目,那時(shí)候的原生PHP有許多許多的瓶頸,所以國(guó)內(nèi)那時(shí)候涌現(xiàn)了很多使用C來為PHP加速的開發(fā)者。
隨著PHP慢慢發(fā)展,PHP的特性越來越豐富,性能也越來越好,而PECL庫(kù)里的拓展也經(jīng)歷了這么多年的洗禮和沖刷,越來越穩(wěn)固?,F(xiàn)如今更多的PHP開發(fā)者圍繞著原生PHP做業(yè)務(wù),其實(shí)我覺得這反而是一個(gè)好現(xiàn)象,專業(yè)的人做專業(yè)的事,更多熱愛它的人愿意留下做貢獻(xiàn),社區(qū)雖然沒有像那段時(shí)間一樣的向四面八方高速發(fā)展,但體現(xiàn)出來的是更有方向感的一種進(jìn)步。
在這樣一種狀況下,外加上我接觸了比較多其他的語言和項(xiàng)目,激發(fā)了我想利用現(xiàn)有的擴(kuò)展結(jié)合原生PHP去做一些看起來厲害的、用起來騷氣的一些庫(kù)或者組件,做一些可能重復(fù)造輪子的事兒;當(dāng)然,一方面是希望盡可能的做一些新輪子,另一方面也是希望能夠通過實(shí)踐,更深的理解某些知識(shí)。
我打算做的是一個(gè)輕量的任務(wù)調(diào)度服務(wù),原本計(jì)劃是Golang做開發(fā),在業(yè)內(nèi)大部分人的評(píng)價(jià)來說,Golang像是一個(gè)高級(jí)的PHP/高級(jí)Python;其實(shí)用Golang做一個(gè)任務(wù)調(diào)度服務(wù)來說是件比較簡(jiǎn)單的事兒,而且市面上也有比較多的調(diào)度服務(wù),這其實(shí)是一個(gè)重復(fù)輪子的事兒,考慮到這個(gè)情況,我思考了一下,打算先用PHP實(shí)現(xiàn)一個(gè)這樣的服務(wù)。
通常來說PHP語言都圍繞著PHP-FPM來做的開發(fā),畢竟PHP業(yè)內(nèi)最工業(yè)化的架構(gòu)就是LNMP/LAMP,但是這就不符合“輕量”這一特性了,所以我把目光鎖定在了workerman、amphp、reactphp上;但為了深入了解這些PHP中優(yōu)秀的reacto模型框架,我決定自己擼一個(gè)event-loop;
我分別研究了 walkor/workerman 、 reactphp/event-loop,發(fā)現(xiàn)了一些比較有趣的事兒;
一方面要看這個(gè)PHP框架的C含量有多少了,越多性能就越高。
這樣的思維普遍存在在各種語言/框架中,比如PHP的YAF、Phalcon、swoole等,比如Python早期的cpython、numpy等;畢竟C語言作為祖師爺?shù)拇嬖冢N近系統(tǒng),對(duì)于內(nèi)存、系統(tǒng)的調(diào)用來的更直接,只要編碼足夠優(yōu)秀,性能就可以足夠優(yōu)秀。
一方面要關(guān)注開發(fā)的模式
通常來說,我們選用一款開發(fā)框架都是用于開發(fā)業(yè)務(wù)的,這里面會(huì)面對(duì)和使用各種各樣已經(jīng)存在的輪子,比如PDO、composer組件等,這些組件/拓展功能的存在已經(jīng)有一定的歷史了,他們從過去到現(xiàn)在的發(fā)展也大多聚焦在blocking-IO的模式上,也就是同步阻塞的模式,因?yàn)檫@種模式更簡(jiǎn)單直接,每一行的代碼都是順序進(jìn)行下去,好掌控,也好排查,這樣的開發(fā)模式可以讓程序員降低不少的心智負(fù)擔(dān),聚焦在業(yè)務(wù)上。
我們回過頭聚焦 reactphp和workerman;我們大部分的開發(fā)模式像上述說的,我們都會(huì)聚焦在BIO的模式上,一方面是輪子是這么做的,一方面是這樣的開發(fā)速度更快更直接;拋開進(jìn)程不說,如果我們?cè)趩芜M(jìn)程下使用reactor模型,那么這樣的event-loop就會(huì)退化成和PHP-FPM一樣的阻塞等待的程序,workerman是如何做的呢?多進(jìn)程,多開了比較多的進(jìn)程來并行處理業(yè)務(wù),也利用了linux的端口復(fù)用(SO_REUSEADDR、SO_REUSEPORT);reactphp利用的是異步編程的方式,盡可能地不阻塞event-loop;那么這里reactphp的代價(jià)就是需要為這套編程方式實(shí)現(xiàn)許許多多的異步客戶端,做很多輪子的工作,這里包含解決回調(diào)地獄的 reactphp/promise 等,因?yàn)橐坏┳枞薳vent-loop,它便會(huì)退化。
除此之外,還有提到的含C量上,以下我會(huì)用代碼解釋這個(gè)含C量具體提現(xiàn)在哪兒:
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author 有個(gè)鬼<42765633@qq.com>
* @copyright 有個(gè)鬼<42765633@qq.com>
* @link http://wtbis.cn/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Workerman\Events;
use Workerman\Worker;
/**
* libevent eventloop
*/
class Event implements EventInterface
{
/**
* Event base.
* @var object
*/
protected $_eventBase = null;
/**
* All listeners for read/write event.
* @var array
*/
protected $_allEvents = array();
/**
* Event listeners of signal.
* @var array
*/
protected $_eventSignal = array();
/**
* All timer event listeners.
* [func, args, event, flag, time_interval]
* @var array
*/
protected $_eventTimer = array();
/**
* Timer id.
* @var int
*/
protected static $_timerId = 1;
/**
* construct
* @return void
*/
public function __construct()
{
if (\class_exists('\\\\EventBase', false)) {
$class_name = '\\\\EventBase';
} else {
$class_name = '\EventBase';
}
$this->_eventBase = new $class_name();
}
/**
* @see EventInterface::add()
*/
public function add($fd, $flag, $func, $args=array())
{
if (\class_exists('\\\\Event', false)) {
$class_name = '\\\\Event';
} else {
$class_name = '\Event';
}
switch ($flag) {
case self::EV_SIGNAL:
$fd_key = (int)$fd;
$event = $class_name::signal($this->_eventBase, $fd, $func);
if (!$event||!$event->add()) {
return false;
}
$this->_eventSignal[$fd_key] = $event;
return true;
case self::EV_TIMER:
case self::EV_TIMER_ONCE:
$param = array($func, (array)$args, $flag, $fd, self::$_timerId);
$event = new $class_name($this->_eventBase, -1, $class_name::TIMEOUT|$class_name::PERSIST, array($this, "timerCallback"), $param);
if (!$event||!$event->addTimer($fd)) {
return false;
}
$this->_eventTimer[self::$_timerId] = $event;
return self::$_timerId++;
default :
$fd_key = (int)$fd;
$real_flag = $flag === self::EV_READ ? $class_name::READ | $class_name::PERSIST : $class_name::WRITE | $class_name::PERSIST;
$event = new $class_name($this->_eventBase, $fd, $real_flag, $func, $fd);
if (!$event||!$event->add()) {
return false;
}
$this->_allEvents[$fd_key][$flag] = $event;
return true;
}
}
/**
* @see Events\EventInterface::del()
*/
public function del($fd, $flag)
{
switch ($flag) {
case self::EV_READ:
case self::EV_WRITE:
$fd_key = (int)$fd;
if (isset($this->_allEvents[$fd_key][$flag])) {
$this->_allEvents[$fd_key][$flag]->del();
unset($this->_allEvents[$fd_key][$flag]);
}
if (empty($this->_allEvents[$fd_key])) {
unset($this->_allEvents[$fd_key]);
}
break;
case self::EV_SIGNAL:
$fd_key = (int)$fd;
if (isset($this->_eventSignal[$fd_key])) {
$this->_eventSignal[$fd_key]->del();
unset($this->_eventSignal[$fd_key]);
}
break;
case self::EV_TIMER:
case self::EV_TIMER_ONCE:
if (isset($this->_eventTimer[$fd])) {
$this->_eventTimer[$fd]->del();
unset($this->_eventTimer[$fd]);
}
break;
}
return true;
}
/**
* Timer callback.
* @param int|null $fd
* @param int $what
* @param int $timer_id
*/
public function timerCallback($fd, $what, $param)
{
$timer_id = $param[4];
if ($param[2] === self::EV_TIMER_ONCE) {
$this->_eventTimer[$timer_id]->del();
unset($this->_eventTimer[$timer_id]);
}
try {
\call_user_func_array($param[0], $param[1]);
} catch (\Exception $e) {
Worker::stopAll(250, $e);
} catch (\Error $e) {
Worker::stopAll(250, $e);
}
}
/**
* @see Events\EventInterface::clearAllTimer()
* @return void
*/
public function clearAllTimer()
{
foreach ($this->_eventTimer as $event) {
$event->del();
}
$this->_eventTimer = array();
}
/**
* @see EventInterface::loop()
*/
public function loop()
{
$this->_eventBase->loop();
}
/**
* Destroy loop.
*
* @return void
*/
public function destroy()
{
$this->_eventBase->exit();
}
/**
* Get timer count.
*
* @return integer
*/
public function getTimerCount()
{
return \count($this->_eventTimer);
}
}
以上的代碼中以 loop() 這段代碼來說,workerman直接將事件的循環(huán)交給了ext-event拓展,因?yàn)閑xt-event底層使用的是libevent庫(kù),是一個(gè)C語言的事件循環(huán)庫(kù),你可以理解為把一個(gè)循環(huán)體交給了C語言。
C語言的循環(huán)肯定是比PHP的循環(huán)效率更高的,在同等時(shí)間單位下,循環(huán)的次數(shù)肯定是比PHP要高不少的。
<?php
namespace React\EventLoop;
use BadMethodCallException;
use Event;
use EventBase;
use React\EventLoop\Tick\FutureTickQueue;
use React\EventLoop\Timer\Timer;
use SplObjectStorage;
/**
* An `ext-event` based event loop.
*
* This uses the [`event` PECL extension](https://pecl.php.net/package/event),
* that provides an interface to `libevent` library.
* `libevent` itself supports a number of system-specific backends (epoll, kqueue).
*
* This loop is known to work with PHP 5.4 through PHP 8+.
*
* @link https://pecl.php.net/package/event
*/
final class ExtEventLoop implements LoopInterface
{
private $eventBase;
private $futureTickQueue;
private $timerCallback;
private $timerEvents;
private $streamCallback;
private $readEvents = array();
private $writeEvents = array();
private $readListeners = array();
private $writeListeners = array();
private $readRefs = array();
private $writeRefs = array();
private $running;
private $signals;
private $signalEvents = array();
public function __construct()
{
if (!\class_exists('EventBase', false)) {
throw new BadMethodCallException('Cannot create ExtEventLoop, ext-event extension missing');
}
// support arbitrary file descriptors and not just sockets
// Windows only has limited file descriptor support, so do not require this (will fail otherwise)
// @link http://www.wangafu.net/~nickm/libevent-book/Ref2_eventbase.html#_setting_up_a_complicated_event_base
$config = new \EventConfig();
if (\DIRECTORY_SEPARATOR !== '\\') {
$config->requireFeatures(\EventConfig::FEATURE_FDS);
}
$this->eventBase = new EventBase($config);
$this->futureTickQueue = new FutureTickQueue();
$this->timerEvents = new SplObjectStorage();
$this->signals = new SignalsHandler();
$this->createTimerCallback();
$this->createStreamCallback();
}
public function __destruct()
{
// explicitly clear all references to Event objects to prevent SEGFAULTs on Windows
foreach ($this->timerEvents as $timer) {
$this->timerEvents->detach($timer);
}
$this->readEvents = array();
$this->writeEvents = array();
}
public function addReadStream($stream, $listener)
{
$key = (int) $stream;
if (isset($this->readListeners[$key])) {
return;
}
$event = new Event($this->eventBase, $stream, Event::PERSIST | Event::READ, $this->streamCallback);
$event->add();
$this->readEvents[$key] = $event;
$this->readListeners[$key] = $listener;
// ext-event does not increase refcount on stream resources for PHP 7+
// manually keep track of stream resource to prevent premature garbage collection
if (\PHP_VERSION_ID >= 70000) {
$this->readRefs[$key] = $stream;
}
}
public function addWriteStream($stream, $listener)
{
$key = (int) $stream;
if (isset($this->writeListeners[$key])) {
return;
}
$event = new Event($this->eventBase, $stream, Event::PERSIST | Event::WRITE, $this->streamCallback);
$event->add();
$this->writeEvents[$key] = $event;
$this->writeListeners[$key] = $listener;
// ext-event does not increase refcount on stream resources for PHP 7+
// manually keep track of stream resource to prevent premature garbage collection
if (\PHP_VERSION_ID >= 70000) {
$this->writeRefs[$key] = $stream;
}
}
public function removeReadStream($stream)
{
$key = (int) $stream;
if (isset($this->readEvents[$key])) {
$this->readEvents[$key]->free();
unset(
$this->readEvents[$key],
$this->readListeners[$key],
$this->readRefs[$key]
);
}
}
public function removeWriteStream($stream)
{
$key = (int) $stream;
if (isset($this->writeEvents[$key])) {
$this->writeEvents[$key]->free();
unset(
$this->writeEvents[$key],
$this->writeListeners[$key],
$this->writeRefs[$key]
);
}
}
public function addTimer($interval, $callback)
{
$timer = new Timer($interval, $callback, false);
$this->scheduleTimer($timer);
return $timer;
}
public function addPeriodicTimer($interval, $callback)
{
$timer = new Timer($interval, $callback, true);
$this->scheduleTimer($timer);
return $timer;
}
public function cancelTimer(TimerInterface $timer)
{
if ($this->timerEvents->contains($timer)) {
$this->timerEvents[$timer]->free();
$this->timerEvents->detach($timer);
}
}
public function futureTick($listener)
{
$this->futureTickQueue->add($listener);
}
public function addSignal($signal, $listener)
{
$this->signals->add($signal, $listener);
if (!isset($this->signalEvents[$signal])) {
$this->signalEvents[$signal] = Event::signal($this->eventBase, $signal, array($this->signals, 'call'));
$this->signalEvents[$signal]->add();
}
}
public function removeSignal($signal, $listener)
{
$this->signals->remove($signal, $listener);
if (isset($this->signalEvents[$signal]) && $this->signals->count($signal) === 0) {
$this->signalEvents[$signal]->free();
unset($this->signalEvents[$signal]);
}
}
public function run()
{
$this->running = true;
while ($this->running) {
$this->futureTickQueue->tick();
$flags = EventBase::LOOP_ONCE;
if (!$this->running || !$this->futureTickQueue->isEmpty()) {
$flags |= EventBase::LOOP_NONBLOCK;
} elseif (!$this->readEvents && !$this->writeEvents && !$this->timerEvents->count() && $this->signals->isEmpty()) {
break;
}
$this->eventBase->loop($flags);
}
}
public function stop()
{
$this->running = false;
}
/**
* Schedule a timer for execution.
*
* @param TimerInterface $timer
*/
private function scheduleTimer(TimerInterface $timer)
{
$flags = Event::TIMEOUT;
if ($timer->isPeriodic()) {
$flags |= Event::PERSIST;
}
$event = new Event($this->eventBase, -1, $flags, $this->timerCallback, $timer);
$this->timerEvents[$timer] = $event;
$event->add($timer->getInterval());
}
/**
* Create a callback used as the target of timer events.
*
* A reference is kept to the callback for the lifetime of the loop
* to prevent "Cannot destroy active lambda function" fatal error from
* the event extension.
*/
private function createTimerCallback()
{
$timers = $this->timerEvents;
$this->timerCallback = function ($_, $__, $timer) use ($timers) {
\call_user_func($timer->getCallback(), $timer);
if (!$timer->isPeriodic() && $timers->contains($timer)) {
$this->cancelTimer($timer);
}
};
}
/**
* Create a callback used as the target of stream events.
*
* A reference is kept to the callback for the lifetime of the loop
* to prevent "Cannot destroy active lambda function" fatal error from
* the event extension.
*/
private function createStreamCallback()
{
$read =& $this->readListeners;
$write =& $this->writeListeners;
$this->streamCallback = function ($stream, $flags) use (&$read, &$write) {
$key = (int) $stream;
if (Event::READ === (Event::READ & $flags) && isset($read[$key])) {
\call_user_func($read[$key], $stream);
}
if (Event::WRITE === (Event::WRITE & $flags) && isset($write[$key])) {
\call_user_func($write[$key], $stream);
}
};
}
}
這里 reactphp 的 run() 與 workerman 的 loop() 不同的是使用了PHP的循環(huán)來做的,這可能也是為了更好的掌控循環(huán)中的一些事件的優(yōu)先級(jí),對(duì)循環(huán)的把控力度更高的緣故。
那么綜上所述,workerman 相比較 reactphp 在含C量上是要足很多的,尤其是主要用作常駐功能、事件監(jiān)聽的循環(huán)體上,這是最關(guān)鍵的一點(diǎn),這一點(diǎn)尤其體現(xiàn)在當(dāng)兩者都退化成同步阻塞的業(yè)務(wù)來說,workerman肯定是比reactphp的性能更高的;但如果使用了reactphp的異步體系的話,這個(gè)我沒有測(cè)試過,但憑經(jīng)驗(yàn)來說的話,我覺得reactphp可能會(huì)更好(沒有測(cè)試過,大膽猜測(cè)以下);另外swoole是使用C在底層做了很多異步的調(diào)度工作,讓用戶開發(fā)更像同步開發(fā),這點(diǎn)就不展開來說了。
秉著折騰的心態(tài),我自己實(shí)現(xiàn)了一個(gè)事件循環(huán)庫(kù),結(jié)合了reacphp和workerman二者,折騰了一下:
在做這個(gè)項(xiàng)目的的單元測(cè)試的時(shí)候,我也遇到了很多坑,有了很多心得,今天就先更新到這,改天再繼續(xù)說說這些坑和心得。
?? 更新于2020-05-28
使用PHP實(shí)現(xiàn)原生loop其實(shí)是件比較簡(jiǎn)單的事兒,這個(gè)loop里面需要干的事兒只有三件:
1和2分別可以使用PHP相關(guān)函數(shù)來實(shí)現(xiàn):
那么定時(shí)器是如何實(shí)現(xiàn)的呢?這里我用的是一個(gè)優(yōu)先隊(duì)列來保存(PHP的SPL中有相當(dāng)多有用的高效的數(shù)據(jù)結(jié)構(gòu),推薦大家多關(guān)注SPL,也就是PHP標(biāo)準(zhǔn)庫(kù)):
/** @var SplPriorityQueue 優(yōu)先隊(duì)列 */
protected SplPriorityQueue $_queue;
/** @inheritDoc */
public function __construct()
{
if(!extension_loaded('pcntl')){
throw new LoopException('not support: ext-pcntl');
}
parent::__construct();
$this->_queue = new SplPriorityQueue();
$this->_queue->setExtractFlags(SplPriorityQueue::EXTR_BOTH);
$this->_readFds = [];
$this->_writeFds = [];
}
/** 執(zhí)行 */
protected function _tick(): void
{
$count = $this->_queue->count();
while ($count--){
$data = $this->_queue->top();
$runTime = -$data['priority'];
$timerId = $data['data'];
/** @var Timer $data */
if($data = $this->_storage->get($timerId)){
$repeat = $data->getRepeat();
$callback = $data->getHandler();
$timeNow = \hrtime(true) * 1e-9;
if (($runTime - $timeNow) <= 0) {
$this->_queue->extract();
\call_user_func($callback);
if($repeat !== 0.0){
$nextTime = $timeNow + $repeat;
$this->_queue->insert($timerId, -$nextTime);
}else{
$this->delTimer($timerId);
}
}
}
}
}
畢竟添加的定時(shí)器可能比較多,每個(gè)定時(shí)器也在當(dāng)前進(jìn)程內(nèi)也會(huì)相互阻塞,所以肯定是需要有優(yōu)先級(jí)的,在每個(gè)循環(huán)周期里,都有一個(gè)最接近的定時(shí)器,越接近的,優(yōu)先級(jí)越高,具體代碼體現(xiàn)在上述 _tick() 內(nèi);
所以最后我的 loop() 實(shí)現(xiàn)如下:
/** @inheritDoc */
public function loop(): void
{
$this->_stopped = false;
while (!$this->_stopped) {
if(!$this->_readFds and !$this->_writeFds and !$this->_signals and $this->_storage->isEmpty()){
break;
}
\pcntl_signal_dispatch();
$writes = $this->_writeFds;
$reads = $this->_readFds;
$excepts = [];
foreach ($writes as $key => $socket) {
if (!isset($reads[$key]) && @\ftell($socket) === 0) {
$excepts[$key] = $socket;
}
}
if($writes or $reads or $excepts){
try {
@stream_select($reads, $writes, $excepts, 0,200000);
} catch (\Throwable $e) {}
foreach ($reads as $stream) {
$key = (int)$stream;
if (isset($this->_reads[$key])) {
($this->_reads[$key])($stream);
}
}
foreach ($writes as $stream) {
$key = (int)$stream;
if (isset($this->_writes[$key])) {
($this->_writes[$key])($stream);
}
}
}
$this->_tick();
}
}
stream_select() 這里一開始我使用了官方推薦的200000 微秒作為阻塞等待的時(shí)長(zhǎng),因?yàn)檫@樣可以降低CPU占用率,但是相當(dāng)于每一次loop都至少會(huì)阻塞200000微秒(也就是200ms),那么loop一周期的時(shí)長(zhǎng)就太長(zhǎng)了,單位秒內(nèi)loop的次數(shù)就大大降低,所以我就想試試如果改成0會(huì)如何;結(jié)果發(fā)現(xiàn)CPU的占用變得很高,雖然PHP內(nèi)部應(yīng)該是對(duì)其做了處理,但是還是很高。
這是什么原因呢?這個(gè)大概需要講到linux的時(shí)間分片算法相關(guān)的內(nèi)容了,這個(gè)地方的內(nèi)容我的另一篇分享里有提到 趣談程序演變的過程,大概意思就是如果你的這個(gè)程序不主動(dòng)出讓CPU,每次都會(huì)占用最大時(shí)間片;另外就是整個(gè)系統(tǒng)調(diào)度為了方便、高效,那么你的進(jìn)程分配在CPU-0上,下次繼續(xù)依然會(huì)在CPU-0上,那么就是我們通常聽到的“不能利用多核”(當(dāng)然,如果是單進(jìn)程,利用不利用多核其實(shí)意義不大);
為了減少CPU占用,可以使用主動(dòng)出讓CPU,每種語言的sleep函數(shù)或者方法都是可以出讓CPU的,所以我在每個(gè)loop周期最后都加入了usleep(0),這里雖然使用的是0的入?yún)ⅲ浅鲎孋PU依然有效,sleep(0)同理。
/** @inheritDoc */
public function loop(): void
{
$this->_stopped = false;
while (!$this->_stopped) {
if(!$this->_readFds and !$this->_writeFds and !$this->_signals and $this->_storage->isEmpty()){
break;
}
\pcntl_signal_dispatch();
$writes = $this->_writeFds;
$reads = $this->_readFds;
$excepts = [];
foreach ($writes as $key => $socket) {
if (!isset($reads[$key]) && @\ftell($socket) === 0) {
$excepts[$key] = $socket;
}
}
if($writes or $reads or $excepts){
try {
@stream_select($reads, $writes, $excepts, 0,0);
} catch (\Throwable $e) {}
foreach ($reads as $stream) {
$key = (int)$stream;
if (isset($this->_reads[$key])) {
($this->_reads[$key])($stream);
}
}
foreach ($writes as $stream) {
$key = (int)$stream;
if (isset($this->_writes[$key])) {
($this->_writes[$key])($stream);
}
}
}
$this->_tick();
usleep(0);
}
}
因?yàn)檫@個(gè)是原生PHP的loop,所以需要自行考慮循環(huán)內(nèi)的業(yè)務(wù)邏輯及先后順序;而其他的如event、ev,我直接使用了他們的loop,對(duì)比原生loop來說,會(huì)更簡(jiǎn)單理解一些,這里就不多提了,代碼里面都有。
在做測(cè)試的時(shí)候我還是沒有那么順利,也遇到了不少問題,發(fā)現(xiàn)了不少之前沒有在意的一些東西,先說結(jié)論:
這個(gè)我沒有刻意做一些性能測(cè)試,我在測(cè)試的時(shí)候使用的PHPunit,用例都是一致的(除了Ev和OpenSwoole有幾個(gè)用例特殊外),單憑運(yùn)行時(shí)候的進(jìn)度就能肉眼可見的觀察出區(qū)別,還是挺明顯的。
這個(gè)結(jié)論是因?yàn)槲易鰷y(cè)試的時(shí)候這個(gè)拓展是全程沒有報(bào)錯(cuò),并且很順利很高效的完成了,對(duì)于我來說,心智負(fù)擔(dān)約等于0,我個(gè)人是比較推薦使用這個(gè)拓展的。
這個(gè)結(jié)論主要是體現(xiàn)在如下:
use EvLoop as BaseEvLoop;
/** @var BaseEvLoop loop */
protected BaseEvLoop $_loop;
/** @inheritDoc */
public function __construct()
{
if(!extension_loaded('ev')){
throw new LoopException('ext-ev not support');
}
parent::__construct();
$this->_loop = new BaseEvLoop();
}
/** @inheritDoc */
public function addReadStream($stream, Closure $handler): void
{
if(is_resource($stream) and !isset($this->_reads[$key = (int)$stream])){
$event = $this->_loop->io($stream, Ev::READ, $handler);
$this->_reads[$key] = $event;
$this->_readFds[$key] = $stream;
}
}
這是一個(gè)我實(shí)現(xiàn)的注冊(cè)讀取流回調(diào)的方法,最開始我的實(shí)現(xiàn)方式不是調(diào)用 $this->_loop->io() ,而是如下的實(shí)現(xiàn)方式:
$event = new EvIo($stream,Ev::READ, $handler);
$event->start();
結(jié)果并沒有生效,事件回調(diào)并沒有被成功注冊(cè)進(jìn)入,也可能是我的使用方式錯(cuò)了。
在這個(gè)地方,注冊(cè)成功的回調(diào),我原本以為會(huì)將注冊(cè)的stream傳入我的回調(diào)函數(shù),但沒想到傳入的是一個(gè)EvIo對(duì)象;如下:
# 我以為
function(resource $stream){}
# 實(shí)際上
function(EvIo $stream){}
但這個(gè)其實(shí)還好,我可以通過$stream->fd
獲取對(duì)應(yīng)的resource流,但是沒想到這里卻又有一個(gè)坑;我在注冊(cè)的時(shí)候通過(int)$stream
獲得的resource id,并且將其id和resource以KV的形式保存在_readFds屬性中$this->_readFds[$key] = $stream;
;但沒有想到,回調(diào)傳入的EvIo對(duì)象中通過fd屬性獲取的resource id竟然和我注冊(cè)時(shí)候的id不對(duì)等,而且每次都不對(duì)等,相當(dāng)于每一次都是新的;這時(shí)候我就懵逼了;
但還好,我發(fā)現(xiàn)每次傳入的EvIo對(duì)象是同一個(gè),我索性以EvIo對(duì)象做判斷,通過spl_object_hash()獲取EvIo對(duì)象的id,將注冊(cè)代碼改為了如下:
/** @inheritDoc */
public function addReadStream($stream, Closure $handler): void
{
if(is_resource($stream) and !isset($this->_reads[$key = (int)$stream])){
$event = $this->_loop->io($stream, Ev::READ, $handler);
$this->_reads[$key] = $event;
$this->_readFds[spl_object_hash($event)] = $stream;
}
}
那么對(duì)應(yīng)的delReadStream也要做相應(yīng)的調(diào)整:
/**
* @param resource|EvIo $stream 為了兼容,所以可以傳入原resource,
* 也可以傳入回調(diào)入?yún)vIo對(duì)象
* @return void
*/
public function delReadStream($stream): void
{
if(is_resource($stream) and isset($this->_reads[$key = (int)$stream])){
/** @var EvIo $event */
$event = $this->_reads[$key];
$event->stop();
unset(
$this->_reads[$key],
$this->_readFds[spl_object_hash($event)]
);
}
if($stream instanceof EvIo and isset($this->_readFds[spl_object_hash($stream)])){
$stream->stop();
$key = (int)($this->_readFds[spl_object_hash($stream)]);
unset(
$this->_reads[$key],
$this->_readFds[spl_object_hash($stream)]
);
}
}
WriteStream同理,這里就不多說了。
另外還有一點(diǎn)不是很重要的區(qū)別,無延遲定時(shí)器在Ev下會(huì)比IO更快,你可以理解為每次循環(huán)都是最先執(zhí)行,優(yōu)先級(jí)最高;而原生loop和event都是在IO之后,有點(diǎn)類似于每次循環(huán)的結(jié)束的時(shí)候觸發(fā)
Swoole在測(cè)試的時(shí)候報(bào)了挺多錯(cuò)誤的,感覺和PHPunit有一些沖突,最明顯的是如下這個(gè)錯(cuò)誤:
PHPUnit\Framework\Exception: PHP Fatal error: Uncaught Exception: Serialization of 'Closure' is not allowed
即便使用PHPunit的 @backupStaticAttributes 和 @backupGlobals 依然存在;我沒有去看源碼,個(gè)人理解可能是Swoole底層利用了一些全局的靜態(tài)導(dǎo)致了這個(gè),感興趣的可以去看看源碼。
OpenSwoole反而比Swoole好像要少一些,但也可能是我是用方式不同導(dǎo)致的,但也有一些比較令我意外的地方;在OpenSwoole的event-loop中,我利用了Event::defer嵌套Timer::tick做了一些工作,實(shí)現(xiàn)了無延遲單次定時(shí)器、無延遲循環(huán)定時(shí)器等:
Event::defer(
function() use($id, &$timerId, $repeat, $callback){
if($timerId = Timer::tick($repeat, $callback))
$this->_storage->set($id, $timerId);
}
$callback();
}
);
在一個(gè)測(cè)試信號(hào)的用例中:
/** @runInSeparateProcess 測(cè)試信號(hào)相應(yīng) */
public function testSignalResponse()
{
if (
!function_exists('posix_kill') or
!function_exists('posix_getpid')
) {
$this->markTestSkipped('Signal test skipped because functions "posix_kill" and "posix_getpid" are missing.');
}
$count1 = $count2 = 0;
$this->loop->addSignal(12, function () use (&$count1) {
$count1 ++;
$this->loop->delSignal(12);
$this->loop->destroy();
});
$this->loop->addSignal(10, function () use (&$count2) {
$count2 ++;
$this->loop->delSignal(10);
$this->loop->destroy();
});
$this->loop->addTimer(0.0,0.0,function () {
posix_kill(posix_getpid(), 10);
});
# 猜測(cè)是因?yàn)榘l(fā)送信號(hào)會(huì)被轉(zhuǎn)為異步的緣故,所以當(dāng)去除這個(gè)timer時(shí),則不滿足預(yù)期
# 但是不可以使用Event::defer,這里的無延遲定時(shí)器使用的就是Event::defer
// $this->loop->addTimer(0.0,0.0, function (){});
$this->loop->addTimer($this->tickTimeout,0.0, function (){});
$this->loop->loop();
$this->assertEquals(0, $count1);
$this->assertEquals(1, $count2);
}
如果上述代碼我注釋掉$this->loop->addTimer($this->tickTimeout,0.0, function (){});
這行代碼,結(jié)果便達(dá)不到預(yù)期;如果使用$this->loop->addTimer(0.0,0.0, function (){});
也同樣達(dá)不到預(yù)期,這里$this->loop->addTimer(0.0,0.0, function (){});
可以等同看作是 Event::defer,也就是說必須有一個(gè)基于 Timer::tick 的方法掛在最后,這樣的結(jié)果才符合預(yù)期;
我的猜測(cè)是在Swoole/OpenSwoole環(huán)境下,信號(hào)的發(fā)送和接收是會(huì)被轉(zhuǎn)為異步操作,交給了swoole的輔助線程的緣故,因?yàn)閟woole中還有一條輔助線程用來調(diào)度協(xié)程,也會(huì)包含一個(gè)loop,因?yàn)檫@個(gè)信號(hào)被轉(zhuǎn)為了異步,也就是說當(dāng)我的loop循環(huán)到第二圈的時(shí)候,信號(hào)還并沒有通知到我這里,我因?yàn)槭褂昧薲efer,在第二個(gè)循環(huán)中就將當(dāng)前循環(huán)destroy()了,自然達(dá)不到預(yù)期,但當(dāng)我使用了一個(gè)Timer::tick將我的loop掛起了不止一圈,那么這時(shí)候可能就收到了異步的信號(hào)通知,那么自然也就觸發(fā)了我的回調(diào),自然也就符合了預(yù)期。
還有一個(gè)問題,就是因?yàn)閟woole沒有無延遲的定時(shí)器提供,所以我使用了Event::defer來充當(dāng)這么一個(gè)定時(shí)器,但這里會(huì)存在一個(gè)問題,無延遲的定時(shí)器可能在生產(chǎn)環(huán)境中產(chǎn)生不同的業(yè)務(wù)邏輯,同時(shí)存在好幾個(gè)不同的無延遲定時(shí)器,但這里使用了Event::defer只能注冊(cè)一個(gè)回調(diào)方法,后注冊(cè)的會(huì)覆蓋先注冊(cè)的,所以需要慎用。
這個(gè)swoole/openswoole我目前還沒有測(cè)試完,之后測(cè)試了有其他心得的話,會(huì)繼續(xù)更新,畢竟如果能夠使用一些協(xié)程,也是比較快樂的事兒,繼續(xù)折騰!
這里繼續(xù)提一句,我的 workbunny/event-loop -1.0.0 是生產(chǎn)可用的,做好了測(cè)試覆蓋的;
隨后 1.x 我會(huì)加入 openswoole,當(dāng)然我得先把它測(cè)試通過才可以;
歡迎star!PR!issue!
?? 更新于2020-05-30
Event::tick 可以多次創(chuàng)建不會(huì)覆蓋,也就是不影響無延遲觸發(fā)器和無延遲定時(shí)器得多次創(chuàng)建
之前測(cè)試的信號(hào)和Timer及Event相關(guān)的猜測(cè)應(yīng)該是不存在的,是可以正常調(diào)用,出現(xiàn)之前所述的問題的原因是在測(cè)試用例時(shí)并沒有注意Swoole在一個(gè)循環(huán)周期內(nèi)的事件優(yōu)先級(jí),這個(gè)需要大家關(guān)注
?? 更新于2020-06-02
不同的event-loop在效率上的影響除了之前表達(dá)過的“含C量”之外,還包含了不同語言/不同數(shù)據(jù)結(jié)構(gòu)所帶來的一些時(shí)間復(fù)雜度的影響:
workerman-eventloop(開啟event拓展下)
reactphp-eventloop(開啟event拓展下)
第一點(diǎn)的比較,體現(xiàn)了原生PHP和C在循環(huán)上的差異
reactphp的第二點(diǎn)使用了PHP-SPL的優(yōu)先隊(duì)列來儲(chǔ)存future回調(diào),優(yōu)先級(jí)隊(duì)列本質(zhì)上是堆,時(shí)間復(fù)雜度會(huì)在向優(yōu)先隊(duì)列插入數(shù)據(jù)時(shí)候體現(xiàn)
libevent、libev等C庫(kù)提供了Timer,支持傳入為0的delay參數(shù),也就相當(dāng)于在下一個(gè)循環(huán)周期內(nèi)立即執(zhí)行Timer注冊(cè)的回調(diào)函數(shù),使用Timer.delay=0替代future即可
?? 更新于2023-05-11
Event::tick 可以多次創(chuàng)建不會(huì)覆蓋,也就是不影響無延遲觸發(fā)器和無延遲定時(shí)器得多次創(chuàng)建
之前測(cè)試的信號(hào)和Timer及Event相關(guān)的猜測(cè)應(yīng)該是不存在的,是可以正常調(diào)用,出現(xiàn)之前所述的問題的原因是在測(cè)試用例時(shí)并沒有注意Swoole在一個(gè)循環(huán)周期內(nèi)的事件優(yōu)先級(jí),這個(gè)需要大家關(guān)注
?? 更新于2020-06-02
感謝分享!