国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

event-loop的一些心得體會(huì)

chaz6chez

?? 最新更新于2020-06-02

前言

最早接觸reactor模型的時(shí)候,應(yīng)該是在參與一個(gè)叫zanphp項(xiàng)目的時(shí)候,他是一個(gè)類似swoole的php拓展項(xiàng)目,當(dāng)然它們之間的故事我就不多說了,也有一些沖突和迷茫;在那個(gè)時(shí)間段的PHP發(fā)展還是很蓬勃向上的,那時(shí)候的滴滴、有贊、百度都有很多很多PHP項(xiàng)目,那時(shí)候的原生PHP有許多許多的瓶頸,所以國(guó)內(nèi)那時(shí)候涌現(xiàn)了很多使用C來為PHP加速的開發(fā)者。
隨著PHP慢慢發(fā)展,PHP的特性越來越豐富,性能也越來越好,而PECL庫(kù)里的拓展也經(jīng)歷了這么多年的洗禮和沖刷,越來越穩(wěn)固?,F(xiàn)如今更多的PHP開發(fā)者圍繞著原生PHP做業(yè)務(wù),其實(shí)我覺得這反而是一個(gè)好現(xiàn)象,專業(yè)的人做專業(yè)的事,更多熱愛它的人愿意留下做貢獻(xiàn),社區(qū)雖然沒有像那段時(shí)間一樣的向四面八方高速發(fā)展,但體現(xiàn)出來的是更有方向感的一種進(jìn)步。
在這樣一種狀況下,外加上我接觸了比較多其他的語言和項(xiàng)目,激發(fā)了我想利用現(xiàn)有的擴(kuò)展結(jié)合原生PHP去做一些看起來厲害的、用起來騷氣的一些庫(kù)或者組件,做一些可能重復(fù)造輪子的事兒;當(dāng)然,一方面是希望盡可能的做一些新輪子,另一方面也是希望能夠通過實(shí)踐,更深的理解某些知識(shí)。

開始

我打算做的是一個(gè)輕量的任務(wù)調(diào)度服務(wù),原本計(jì)劃是Golang做開發(fā),在業(yè)內(nèi)大部分人的評(píng)價(jià)來說,Golang像是一個(gè)高級(jí)的PHP/高級(jí)Python;其實(shí)用Golang做一個(gè)任務(wù)調(diào)度服務(wù)來說是件比較簡(jiǎn)單的事兒,而且市面上也有比較多的調(diào)度服務(wù),這其實(shí)是一個(gè)重復(fù)輪子的事兒,考慮到這個(gè)情況,我思考了一下,打算先用PHP實(shí)現(xiàn)一個(gè)這樣的服務(wù)。

通常來說PHP語言都圍繞著PHP-FPM來做的開發(fā),畢竟PHP業(yè)內(nèi)最工業(yè)化的架構(gòu)就是LNMP/LAMP,但是這就不符合“輕量”這一特性了,所以我把目光鎖定在了workerman、amphp、reactphp上;但為了深入了解這些PHP中優(yōu)秀的reacto模型框架,我決定自己擼一個(gè)event-loop;

研究

我分別研究了 walkor/workermanreactphp/event-loop,發(fā)現(xiàn)了一些比較有趣的事兒;

為什么workerman的性能會(huì)高于reactphp呢?

  • 一方面要看這個(gè)PHP框架的C含量有多少了,越多性能就越高。
    這樣的思維普遍存在在各種語言/框架中,比如PHP的YAF、Phalcon、swoole等,比如Python早期的cpython、numpy等;畢竟C語言作為祖師爺?shù)拇嬖冢N近系統(tǒng),對(duì)于內(nèi)存、系統(tǒng)的調(diào)用來的更直接,只要編碼足夠優(yōu)秀,性能就可以足夠優(yōu)秀。

  • 一方面要關(guān)注開發(fā)的模式
    通常來說,我們選用一款開發(fā)框架都是用于開發(fā)業(yè)務(wù)的,這里面會(huì)面對(duì)和使用各種各樣已經(jīng)存在的輪子,比如PDO、composer組件等,這些組件/拓展功能的存在已經(jīng)有一定的歷史了,他們從過去到現(xiàn)在的發(fā)展也大多聚焦在blocking-IO的模式上,也就是同步阻塞的模式,因?yàn)檫@種模式更簡(jiǎn)單直接,每一行的代碼都是順序進(jìn)行下去,好掌控,也好排查,這樣的開發(fā)模式可以讓程序員降低不少的心智負(fù)擔(dān),聚焦在業(yè)務(wù)上。

我們回過頭聚焦 reactphp和workerman;我們大部分的開發(fā)模式像上述說的,我們都會(huì)聚焦在BIO的模式上,一方面是輪子是這么做的,一方面是這樣的開發(fā)速度更快更直接;拋開進(jìn)程不說,如果我們?cè)趩芜M(jìn)程下使用reactor模型,那么這樣的event-loop就會(huì)退化成和PHP-FPM一樣的阻塞等待的程序,workerman是如何做的呢?多進(jìn)程,多開了比較多的進(jìn)程來并行處理業(yè)務(wù),也利用了linux的端口復(fù)用(SO_REUSEADDR、SO_REUSEPORT);reactphp利用的是異步編程的方式,盡可能地不阻塞event-loop;那么這里reactphp的代價(jià)就是需要為這套編程方式實(shí)現(xiàn)許許多多的異步客戶端,做很多輪子的工作,這里包含解決回調(diào)地獄的 reactphp/promise 等,因?yàn)橐坏┳枞薳vent-loop,它便會(huì)退化。

除此之外,還有提到的含C量上,以下我會(huì)用代碼解釋這個(gè)含C量具體提現(xiàn)在哪兒:

  • workerman的event-loop,以ext-event舉例
<?php 
/**
 * This file is part of workerman.
 *
 * Licensed under The MIT License
 * For full copyright and license information, please see the MIT-LICENSE.txt
 * Redistributions of files must retain the above copyright notice.
 *
 * @author    有個(gè)鬼<42765633@qq.com>
 * @copyright 有個(gè)鬼<42765633@qq.com>
 * @link      http://wtbis.cn/
 * @license   http://www.opensource.org/licenses/mit-license.php MIT License
 */
namespace Workerman\Events;

use Workerman\Worker;

/**
 * libevent eventloop
 */
class Event implements EventInterface
{
    /**
     * Event base.
     * @var object
     */
    protected $_eventBase = null;

    /**
     * All listeners for read/write event.
     * @var array
     */
    protected $_allEvents = array();

    /**
     * Event listeners of signal.
     * @var array
     */
    protected $_eventSignal = array();

    /**
     * All timer event listeners.
     * [func, args, event, flag, time_interval]
     * @var array
     */
    protected $_eventTimer = array();

    /**
     * Timer id.
     * @var int
     */
    protected static $_timerId = 1;

    /**
     * construct
     * @return void
     */
    public function __construct()
    {
        if (\class_exists('\\\\EventBase', false)) {
            $class_name = '\\\\EventBase';
        } else {
            $class_name = '\EventBase';
        }
        $this->_eventBase = new $class_name();
    }

    /**
     * @see EventInterface::add()
     */
    public function add($fd, $flag, $func, $args=array())
    {
        if (\class_exists('\\\\Event', false)) {
            $class_name = '\\\\Event';
        } else {
            $class_name = '\Event';
        }
        switch ($flag) {
            case self::EV_SIGNAL:

                $fd_key = (int)$fd;
                $event = $class_name::signal($this->_eventBase, $fd, $func);
                if (!$event||!$event->add()) {
                    return false;
                }
                $this->_eventSignal[$fd_key] = $event;
                return true;

            case self::EV_TIMER:
            case self::EV_TIMER_ONCE:

                $param = array($func, (array)$args, $flag, $fd, self::$_timerId);
                $event = new $class_name($this->_eventBase, -1, $class_name::TIMEOUT|$class_name::PERSIST, array($this, "timerCallback"), $param);
                if (!$event||!$event->addTimer($fd)) {
                    return false;
                }
                $this->_eventTimer[self::$_timerId] = $event;
                return self::$_timerId++;

            default :
                $fd_key = (int)$fd;
                $real_flag = $flag === self::EV_READ ? $class_name::READ | $class_name::PERSIST : $class_name::WRITE | $class_name::PERSIST;
                $event = new $class_name($this->_eventBase, $fd, $real_flag, $func, $fd);
                if (!$event||!$event->add()) {
                    return false;
                }
                $this->_allEvents[$fd_key][$flag] = $event;
                return true;
        }
    }

    /**
     * @see Events\EventInterface::del()
     */
    public function del($fd, $flag)
    {
        switch ($flag) {

            case self::EV_READ:
            case self::EV_WRITE:

                $fd_key = (int)$fd;
                if (isset($this->_allEvents[$fd_key][$flag])) {
                    $this->_allEvents[$fd_key][$flag]->del();
                    unset($this->_allEvents[$fd_key][$flag]);
                }
                if (empty($this->_allEvents[$fd_key])) {
                    unset($this->_allEvents[$fd_key]);
                }
                break;

            case  self::EV_SIGNAL:
                $fd_key = (int)$fd;
                if (isset($this->_eventSignal[$fd_key])) {
                    $this->_eventSignal[$fd_key]->del();
                    unset($this->_eventSignal[$fd_key]);
                }
                break;

            case self::EV_TIMER:
            case self::EV_TIMER_ONCE:
                if (isset($this->_eventTimer[$fd])) {
                    $this->_eventTimer[$fd]->del();
                    unset($this->_eventTimer[$fd]);
                }
                break;
        }
        return true;
    }

    /**
     * Timer callback.
     * @param int|null $fd
     * @param int $what
     * @param int $timer_id
     */
    public function timerCallback($fd, $what, $param)
    {
        $timer_id = $param[4];

        if ($param[2] === self::EV_TIMER_ONCE) {
            $this->_eventTimer[$timer_id]->del();
            unset($this->_eventTimer[$timer_id]);
        }

        try {
            \call_user_func_array($param[0], $param[1]);
        } catch (\Exception $e) {
            Worker::stopAll(250, $e);
        } catch (\Error $e) {
            Worker::stopAll(250, $e);
        }
    }

    /**
     * @see Events\EventInterface::clearAllTimer() 
     * @return void
     */
    public function clearAllTimer()
    {
        foreach ($this->_eventTimer as $event) {
            $event->del();
        }
        $this->_eventTimer = array();
    }

    /**
     * @see EventInterface::loop()
     */
    public function loop()
    {
        $this->_eventBase->loop();
    }

    /**
     * Destroy loop.
     *
     * @return void
     */
    public function destroy()
    {
        $this->_eventBase->exit();
    }

    /**
     * Get timer count.
     *
     * @return integer
     */
    public function getTimerCount()
    {
        return \count($this->_eventTimer);
    }
}

以上的代碼中以 loop() 這段代碼來說,workerman直接將事件的循環(huán)交給了ext-event拓展,因?yàn)閑xt-event底層使用的是libevent庫(kù),是一個(gè)C語言的事件循環(huán)庫(kù),你可以理解為把一個(gè)循環(huán)體交給了C語言。
C語言的循環(huán)肯定是比PHP的循環(huán)效率更高的,在同等時(shí)間單位下,循環(huán)的次數(shù)肯定是比PHP要高不少的。

  • reactphp的event-loop,以ext-event舉例
<?php

namespace React\EventLoop;

use BadMethodCallException;
use Event;
use EventBase;
use React\EventLoop\Tick\FutureTickQueue;
use React\EventLoop\Timer\Timer;
use SplObjectStorage;

/**
 * An `ext-event` based event loop.
 *
 * This uses the [`event` PECL extension](https://pecl.php.net/package/event),
 * that provides an interface to `libevent` library.
 * `libevent` itself supports a number of system-specific backends (epoll, kqueue).
 *
 * This loop is known to work with PHP 5.4 through PHP 8+.
 *
 * @link https://pecl.php.net/package/event
 */
final class ExtEventLoop implements LoopInterface
{
    private $eventBase;
    private $futureTickQueue;
    private $timerCallback;
    private $timerEvents;
    private $streamCallback;
    private $readEvents = array();
    private $writeEvents = array();
    private $readListeners = array();
    private $writeListeners = array();
    private $readRefs = array();
    private $writeRefs = array();
    private $running;
    private $signals;
    private $signalEvents = array();

    public function __construct()
    {
        if (!\class_exists('EventBase', false)) {
            throw new BadMethodCallException('Cannot create ExtEventLoop, ext-event extension missing');
        }

        // support arbitrary file descriptors and not just sockets
        // Windows only has limited file descriptor support, so do not require this (will fail otherwise)
        // @link http://www.wangafu.net/~nickm/libevent-book/Ref2_eventbase.html#_setting_up_a_complicated_event_base
        $config = new \EventConfig();
        if (\DIRECTORY_SEPARATOR !== '\\') {
            $config->requireFeatures(\EventConfig::FEATURE_FDS);
        }

        $this->eventBase = new EventBase($config);
        $this->futureTickQueue = new FutureTickQueue();
        $this->timerEvents = new SplObjectStorage();
        $this->signals = new SignalsHandler();

        $this->createTimerCallback();
        $this->createStreamCallback();
    }

    public function __destruct()
    {
        // explicitly clear all references to Event objects to prevent SEGFAULTs on Windows
        foreach ($this->timerEvents as $timer) {
            $this->timerEvents->detach($timer);
        }

        $this->readEvents = array();
        $this->writeEvents = array();
    }

    public function addReadStream($stream, $listener)
    {
        $key = (int) $stream;
        if (isset($this->readListeners[$key])) {
            return;
        }

        $event = new Event($this->eventBase, $stream, Event::PERSIST | Event::READ, $this->streamCallback);
        $event->add();
        $this->readEvents[$key] = $event;
        $this->readListeners[$key] = $listener;

        // ext-event does not increase refcount on stream resources for PHP 7+
        // manually keep track of stream resource to prevent premature garbage collection
        if (\PHP_VERSION_ID >= 70000) {
            $this->readRefs[$key] = $stream;
        }
    }

    public function addWriteStream($stream, $listener)
    {
        $key = (int) $stream;
        if (isset($this->writeListeners[$key])) {
            return;
        }

        $event = new Event($this->eventBase, $stream, Event::PERSIST | Event::WRITE, $this->streamCallback);
        $event->add();
        $this->writeEvents[$key] = $event;
        $this->writeListeners[$key] = $listener;

        // ext-event does not increase refcount on stream resources for PHP 7+
        // manually keep track of stream resource to prevent premature garbage collection
        if (\PHP_VERSION_ID >= 70000) {
            $this->writeRefs[$key] = $stream;
        }
    }

    public function removeReadStream($stream)
    {
        $key = (int) $stream;

        if (isset($this->readEvents[$key])) {
            $this->readEvents[$key]->free();
            unset(
                $this->readEvents[$key],
                $this->readListeners[$key],
                $this->readRefs[$key]
            );
        }
    }

    public function removeWriteStream($stream)
    {
        $key = (int) $stream;

        if (isset($this->writeEvents[$key])) {
            $this->writeEvents[$key]->free();
            unset(
                $this->writeEvents[$key],
                $this->writeListeners[$key],
                $this->writeRefs[$key]
            );
        }
    }

    public function addTimer($interval, $callback)
    {
        $timer = new Timer($interval, $callback, false);

        $this->scheduleTimer($timer);

        return $timer;
    }

    public function addPeriodicTimer($interval, $callback)
    {
        $timer = new Timer($interval, $callback, true);

        $this->scheduleTimer($timer);

        return $timer;
    }

    public function cancelTimer(TimerInterface $timer)
    {
        if ($this->timerEvents->contains($timer)) {
            $this->timerEvents[$timer]->free();
            $this->timerEvents->detach($timer);
        }
    }

    public function futureTick($listener)
    {
        $this->futureTickQueue->add($listener);
    }

    public function addSignal($signal, $listener)
    {
        $this->signals->add($signal, $listener);

        if (!isset($this->signalEvents[$signal])) {
            $this->signalEvents[$signal] = Event::signal($this->eventBase, $signal, array($this->signals, 'call'));
            $this->signalEvents[$signal]->add();
        }
    }

    public function removeSignal($signal, $listener)
    {
        $this->signals->remove($signal, $listener);

        if (isset($this->signalEvents[$signal]) && $this->signals->count($signal) === 0) {
            $this->signalEvents[$signal]->free();
            unset($this->signalEvents[$signal]);
        }
    }

    public function run()
    {
        $this->running = true;

        while ($this->running) {
            $this->futureTickQueue->tick();

            $flags = EventBase::LOOP_ONCE;
            if (!$this->running || !$this->futureTickQueue->isEmpty()) {
                $flags |= EventBase::LOOP_NONBLOCK;
            } elseif (!$this->readEvents && !$this->writeEvents && !$this->timerEvents->count() && $this->signals->isEmpty()) {
                break;
            }

            $this->eventBase->loop($flags);
        }
    }

    public function stop()
    {
        $this->running = false;
    }

    /**
     * Schedule a timer for execution.
     *
     * @param TimerInterface $timer
     */
    private function scheduleTimer(TimerInterface $timer)
    {
        $flags = Event::TIMEOUT;

        if ($timer->isPeriodic()) {
            $flags |= Event::PERSIST;
        }

        $event = new Event($this->eventBase, -1, $flags, $this->timerCallback, $timer);
        $this->timerEvents[$timer] = $event;

        $event->add($timer->getInterval());
    }

    /**
     * Create a callback used as the target of timer events.
     *
     * A reference is kept to the callback for the lifetime of the loop
     * to prevent "Cannot destroy active lambda function" fatal error from
     * the event extension.
     */
    private function createTimerCallback()
    {
        $timers = $this->timerEvents;
        $this->timerCallback = function ($_, $__, $timer) use ($timers) {
            \call_user_func($timer->getCallback(), $timer);

            if (!$timer->isPeriodic() && $timers->contains($timer)) {
                $this->cancelTimer($timer);
            }
        };
    }

    /**
     * Create a callback used as the target of stream events.
     *
     * A reference is kept to the callback for the lifetime of the loop
     * to prevent "Cannot destroy active lambda function" fatal error from
     * the event extension.
     */
    private function createStreamCallback()
    {
        $read =& $this->readListeners;
        $write =& $this->writeListeners;
        $this->streamCallback = function ($stream, $flags) use (&$read, &$write) {
            $key = (int) $stream;

            if (Event::READ === (Event::READ & $flags) && isset($read[$key])) {
                \call_user_func($read[$key], $stream);
            }

            if (Event::WRITE === (Event::WRITE & $flags) && isset($write[$key])) {
                \call_user_func($write[$key], $stream);
            }
        };
    }
}

這里 reactphp 的 run() 與 workerman 的 loop() 不同的是使用了PHP的循環(huán)來做的,這可能也是為了更好的掌控循環(huán)中的一些事件的優(yōu)先級(jí),對(duì)循環(huán)的把控力度更高的緣故。

那么綜上所述,workerman 相比較 reactphp 在含C量上是要足很多的,尤其是主要用作常駐功能、事件監(jiān)聽的循環(huán)體上,這是最關(guān)鍵的一點(diǎn),這一點(diǎn)尤其體現(xiàn)在當(dāng)兩者都退化成同步阻塞的業(yè)務(wù)來說,workerman肯定是比reactphp的性能更高的;但如果使用了reactphp的異步體系的話,這個(gè)我沒有測(cè)試過,但憑經(jīng)驗(yàn)來說的話,我覺得reactphp可能會(huì)更好(沒有測(cè)試過,大膽猜測(cè)以下);另外swoole是使用C在底層做了很多異步的調(diào)度工作,讓用戶開發(fā)更像同步開發(fā),這點(diǎn)就不展開來說了。

自行實(shí)現(xiàn)event-loop

秉著折騰的心態(tài),我自己實(shí)現(xiàn)了一個(gè)事件循環(huán)庫(kù),結(jié)合了reacphp和workerman二者,折騰了一下:

workbunny/event-loop

在做這個(gè)項(xiàng)目的的單元測(cè)試的時(shí)候,我也遇到了很多坑,有了很多心得,今天就先更新到這,改天再繼續(xù)說說這些坑和心得。

?? 更新于2020-05-28

PHP原生loop

使用PHP實(shí)現(xiàn)原生loop其實(shí)是件比較簡(jiǎn)單的事兒,這個(gè)loop里面需要干的事兒只有三件:

  1. 監(jiān)聽系統(tǒng)信號(hào)
  2. 監(jiān)聽流信息
  3. 定時(shí)器

1和2分別可以使用PHP相關(guān)函數(shù)來實(shí)現(xiàn):

  1. pcntl_signal_dispatch(),用于監(jiān)聽信號(hào)
  2. stream_select(),用于獲取流數(shù)據(jù)

那么定時(shí)器是如何實(shí)現(xiàn)的呢?這里我用的是一個(gè)優(yōu)先隊(duì)列來保存(PHP的SPL中有相當(dāng)多有用的高效的數(shù)據(jù)結(jié)構(gòu),推薦大家多關(guān)注SPL,也就是PHP標(biāo)準(zhǔn)庫(kù)):

    /** @var SplPriorityQueue 優(yōu)先隊(duì)列 */
    protected SplPriorityQueue $_queue;

    /** @inheritDoc */
    public function __construct()
    {
        if(!extension_loaded('pcntl')){
            throw new LoopException('not support: ext-pcntl');
        }
        parent::__construct();

        $this->_queue = new SplPriorityQueue();
        $this->_queue->setExtractFlags(SplPriorityQueue::EXTR_BOTH);
        $this->_readFds = [];
        $this->_writeFds = [];
    }

    /** 執(zhí)行 */
    protected function _tick(): void
    {
        $count = $this->_queue->count();
        while ($count--){
            $data = $this->_queue->top();
            $runTime = -$data['priority'];
            $timerId = $data['data'];
            /** @var Timer $data */
            if($data = $this->_storage->get($timerId)){
                $repeat = $data->getRepeat();
                $callback = $data->getHandler();
                $timeNow = \hrtime(true) * 1e-9;
                if (($runTime - $timeNow) <= 0) {
                    $this->_queue->extract();
                    \call_user_func($callback);
                    if($repeat !== 0.0){
                        $nextTime = $timeNow + $repeat;
                        $this->_queue->insert($timerId, -$nextTime);
                    }else{
                        $this->delTimer($timerId);
                    }
                }
            }
        }
    }

畢竟添加的定時(shí)器可能比較多,每個(gè)定時(shí)器也在當(dāng)前進(jìn)程內(nèi)也會(huì)相互阻塞,所以肯定是需要有優(yōu)先級(jí)的,在每個(gè)循環(huán)周期里,都有一個(gè)最接近的定時(shí)器,越接近的,優(yōu)先級(jí)越高,具體代碼體現(xiàn)在上述 _tick() 內(nèi);

所以最后我的 loop() 實(shí)現(xiàn)如下:

    /** @inheritDoc */
    public function loop(): void
    {
        $this->_stopped = false;

        while (!$this->_stopped) {
            if(!$this->_readFds and !$this->_writeFds and !$this->_signals and $this->_storage->isEmpty()){
                break;
            }

            \pcntl_signal_dispatch();

            $writes = $this->_writeFds;
            $reads = $this->_readFds;
            $excepts = [];
            foreach ($writes as $key => $socket) {
                if (!isset($reads[$key]) && @\ftell($socket) === 0) {
                    $excepts[$key] = $socket;
                }
            }

            if($writes or $reads or $excepts){
                try {
                    @stream_select($reads, $writes, $excepts, 0,200000);
                } catch (\Throwable $e) {}

                foreach ($reads as $stream) {
                    $key = (int)$stream;
                    if (isset($this->_reads[$key])) {
                        ($this->_reads[$key])($stream);
                    }
                }

                foreach ($writes as $stream) {
                    $key = (int)$stream;
                    if (isset($this->_writes[$key])) {
                        ($this->_writes[$key])($stream);
                    }
                }
            }

            $this->_tick();
        }
    }

stream_select() 這里一開始我使用了官方推薦的200000 微秒作為阻塞等待的時(shí)長(zhǎng),因?yàn)檫@樣可以降低CPU占用率,但是相當(dāng)于每一次loop都至少會(huì)阻塞200000微秒(也就是200ms),那么loop一周期的時(shí)長(zhǎng)就太長(zhǎng)了,單位秒內(nèi)loop的次數(shù)就大大降低,所以我就想試試如果改成0會(huì)如何;結(jié)果發(fā)現(xiàn)CPU的占用變得很高,雖然PHP內(nèi)部應(yīng)該是對(duì)其做了處理,但是還是很高。

這是什么原因呢?這個(gè)大概需要講到linux的時(shí)間分片算法相關(guān)的內(nèi)容了,這個(gè)地方的內(nèi)容我的另一篇分享里有提到 趣談程序演變的過程,大概意思就是如果你的這個(gè)程序不主動(dòng)出讓CPU,每次都會(huì)占用最大時(shí)間片;另外就是整個(gè)系統(tǒng)調(diào)度為了方便、高效,那么你的進(jìn)程分配在CPU-0上,下次繼續(xù)依然會(huì)在CPU-0上,那么就是我們通常聽到的“不能利用多核”(當(dāng)然,如果是單進(jìn)程,利用不利用多核其實(shí)意義不大);

為了減少CPU占用,可以使用主動(dòng)出讓CPU,每種語言的sleep函數(shù)或者方法都是可以出讓CPU的,所以我在每個(gè)loop周期最后都加入了usleep(0),這里雖然使用的是0的入?yún)ⅲ浅鲎孋PU依然有效,sleep(0)同理。

    /** @inheritDoc */
    public function loop(): void
    {
        $this->_stopped = false;

        while (!$this->_stopped) {
            if(!$this->_readFds and !$this->_writeFds and !$this->_signals and $this->_storage->isEmpty()){
                break;
            }

            \pcntl_signal_dispatch();

            $writes = $this->_writeFds;
            $reads = $this->_readFds;
            $excepts = [];
            foreach ($writes as $key => $socket) {
                if (!isset($reads[$key]) && @\ftell($socket) === 0) {
                    $excepts[$key] = $socket;
                }
            }

            if($writes or $reads or $excepts){
                try {
                    @stream_select($reads, $writes, $excepts, 0,0);
                } catch (\Throwable $e) {}

                foreach ($reads as $stream) {
                    $key = (int)$stream;
                    if (isset($this->_reads[$key])) {
                        ($this->_reads[$key])($stream);
                    }
                }

                foreach ($writes as $stream) {
                    $key = (int)$stream;
                    if (isset($this->_writes[$key])) {
                        ($this->_writes[$key])($stream);
                    }
                }
            }

            $this->_tick();

            usleep(0);
        }
    }

因?yàn)檫@個(gè)是原生PHP的loop,所以需要自行考慮循環(huán)內(nèi)的業(yè)務(wù)邏輯及先后順序;而其他的如event、ev,我直接使用了他們的loop,對(duì)比原生loop來說,會(huì)更簡(jiǎn)單理解一些,這里就不多提了,代碼里面都有。

測(cè)試

在做測(cè)試的時(shí)候我還是沒有那么順利,也遇到了不少問題,發(fā)現(xiàn)了不少之前沒有在意的一些東西,先說結(jié)論:

  1. PHP實(shí)現(xiàn)的原生loop要慢挺多的;
  2. Event拓展還是最穩(wěn)健的一個(gè)選擇;
  3. Ev拓展有一點(diǎn)小坑,可能是我固有思維導(dǎo)致的;
  4. Swoole/OpenSwoole個(gè)人覺得還是不穩(wěn)當(dāng),最好有閱讀C源代碼的能力;

PHP實(shí)現(xiàn)的原生loop要慢挺多的

這個(gè)我沒有刻意做一些性能測(cè)試,我在測(cè)試的時(shí)候使用的PHPunit,用例都是一致的(除了Ev和OpenSwoole有幾個(gè)用例特殊外),單憑運(yùn)行時(shí)候的進(jìn)度就能肉眼可見的觀察出區(qū)別,還是挺明顯的。

Event拓展還是最穩(wěn)健的一個(gè)選擇

這個(gè)結(jié)論是因?yàn)槲易鰷y(cè)試的時(shí)候這個(gè)拓展是全程沒有報(bào)錯(cuò),并且很順利很高效的完成了,對(duì)于我來說,心智負(fù)擔(dān)約等于0,我個(gè)人是比較推薦使用這個(gè)拓展的。

Ev拓展有一點(diǎn)小坑

這個(gè)結(jié)論主要是體現(xiàn)在如下:

use EvLoop as BaseEvLoop;

    /** @var BaseEvLoop loop */
    protected BaseEvLoop $_loop;

    /** @inheritDoc */
    public function __construct()
    {
        if(!extension_loaded('ev')){
            throw new LoopException('ext-ev not support');
        }

        parent::__construct();

        $this->_loop = new BaseEvLoop();
    }

    /** @inheritDoc */
    public function addReadStream($stream, Closure $handler): void
    {
        if(is_resource($stream) and !isset($this->_reads[$key = (int)$stream])){
            $event = $this->_loop->io($stream, Ev::READ, $handler);
            $this->_reads[$key] = $event;
            $this->_readFds[$key] = $stream;
        }
    }

這是一個(gè)我實(shí)現(xiàn)的注冊(cè)讀取流回調(diào)的方法,最開始我的實(shí)現(xiàn)方式不是調(diào)用 $this->_loop->io() ,而是如下的實(shí)現(xiàn)方式:

$event = new EvIo($stream,Ev::READ, $handler);
$event->start();

結(jié)果并沒有生效,事件回調(diào)并沒有被成功注冊(cè)進(jìn)入,也可能是我的使用方式錯(cuò)了。

在這個(gè)地方,注冊(cè)成功的回調(diào),我原本以為會(huì)將注冊(cè)的stream傳入我的回調(diào)函數(shù),但沒想到傳入的是一個(gè)EvIo對(duì)象;如下:

# 我以為
function(resource $stream){}

# 實(shí)際上
function(EvIo $stream){}

但這個(gè)其實(shí)還好,我可以通過$stream->fd獲取對(duì)應(yīng)的resource流,但是沒想到這里卻又有一個(gè)坑;我在注冊(cè)的時(shí)候通過(int)$stream獲得的resource id,并且將其id和resource以KV的形式保存在_readFds屬性中$this->_readFds[$key] = $stream; ;但沒有想到,回調(diào)傳入的EvIo對(duì)象中通過fd屬性獲取的resource id竟然和我注冊(cè)時(shí)候的id不對(duì)等,而且每次都不對(duì)等,相當(dāng)于每一次都是新的;這時(shí)候我就懵逼了;

但還好,我發(fā)現(xiàn)每次傳入的EvIo對(duì)象是同一個(gè),我索性以EvIo對(duì)象做判斷,通過spl_object_hash()獲取EvIo對(duì)象的id,將注冊(cè)代碼改為了如下:


    /** @inheritDoc */
    public function addReadStream($stream, Closure $handler): void
    {
        if(is_resource($stream) and !isset($this->_reads[$key = (int)$stream])){
            $event = $this->_loop->io($stream, Ev::READ, $handler);
            $this->_reads[$key] = $event;
            $this->_readFds[spl_object_hash($event)] = $stream;
        }
    }

那么對(duì)應(yīng)的delReadStream也要做相應(yīng)的調(diào)整:

/**
     * @param resource|EvIo $stream 為了兼容,所以可以傳入原resource,
     *  也可以傳入回調(diào)入?yún)vIo對(duì)象
     * @return void
     */
    public function delReadStream($stream): void
    {
        if(is_resource($stream) and isset($this->_reads[$key = (int)$stream])){
            /** @var EvIo $event */
            $event = $this->_reads[$key];
            $event->stop();
            unset(
                $this->_reads[$key],
                $this->_readFds[spl_object_hash($event)]
            );
        }

        if($stream instanceof EvIo and isset($this->_readFds[spl_object_hash($stream)])){
            $stream->stop();
            $key = (int)($this->_readFds[spl_object_hash($stream)]);
            unset(
                $this->_reads[$key],
                $this->_readFds[spl_object_hash($stream)]
            );
        }
    }

WriteStream同理,這里就不多說了。

另外還有一點(diǎn)不是很重要的區(qū)別,無延遲定時(shí)器在Ev下會(huì)比IO更快,你可以理解為每次循環(huán)都是最先執(zhí)行,優(yōu)先級(jí)最高;而原生loop和event都是在IO之后,有點(diǎn)類似于每次循環(huán)的結(jié)束的時(shí)候觸發(fā)

Swoole/OpenSwoole個(gè)人覺得還是不穩(wěn)當(dāng),最好有閱讀C源代碼的能力

Swoole在測(cè)試的時(shí)候報(bào)了挺多錯(cuò)誤的,感覺和PHPunit有一些沖突,最明顯的是如下這個(gè)錯(cuò)誤:

PHPUnit\Framework\Exception: PHP Fatal error:  Uncaught Exception: Serialization of 'Closure' is not allowed

即便使用PHPunit的 @backupStaticAttributes 和 @backupGlobals 依然存在;我沒有去看源碼,個(gè)人理解可能是Swoole底層利用了一些全局的靜態(tài)導(dǎo)致了這個(gè),感興趣的可以去看看源碼。

OpenSwoole反而比Swoole好像要少一些,但也可能是我是用方式不同導(dǎo)致的,但也有一些比較令我意外的地方;在OpenSwoole的event-loop中,我利用了Event::defer嵌套Timer::tick做了一些工作,實(shí)現(xiàn)了無延遲單次定時(shí)器、無延遲循環(huán)定時(shí)器等:

Event::defer(
    function() use($id, &$timerId, $repeat, $callback){
        if($timerId = Timer::tick($repeat, $callback))
            $this->_storage->set($id, $timerId);
        }
        $callback();
    }
);

在一個(gè)測(cè)試信號(hào)的用例中:

    /** @runInSeparateProcess 測(cè)試信號(hào)相應(yīng) */
    public function testSignalResponse()
    {
        if (
            !function_exists('posix_kill') or
            !function_exists('posix_getpid')
        ) {
            $this->markTestSkipped('Signal test skipped because functions "posix_kill" and "posix_getpid" are missing.');
        }

        $count1 = $count2 = 0;
        $this->loop->addSignal(12, function () use (&$count1) {
            $count1 ++;
            $this->loop->delSignal(12);
            $this->loop->destroy();
        });

        $this->loop->addSignal(10, function () use (&$count2) {
            $count2 ++;
            $this->loop->delSignal(10);
            $this->loop->destroy();
        });

        $this->loop->addTimer(0.0,0.0,function () {
            posix_kill(posix_getpid(), 10);
        });

        # 猜測(cè)是因?yàn)榘l(fā)送信號(hào)會(huì)被轉(zhuǎn)為異步的緣故,所以當(dāng)去除這個(gè)timer時(shí),則不滿足預(yù)期
        # 但是不可以使用Event::defer,這里的無延遲定時(shí)器使用的就是Event::defer
//        $this->loop->addTimer(0.0,0.0, function (){});
        $this->loop->addTimer($this->tickTimeout,0.0, function (){});

        $this->loop->loop();

        $this->assertEquals(0, $count1);
        $this->assertEquals(1, $count2);
    }

如果上述代碼我注釋掉$this->loop->addTimer($this->tickTimeout,0.0, function (){});這行代碼,結(jié)果便達(dá)不到預(yù)期;如果使用$this->loop->addTimer(0.0,0.0, function (){});也同樣達(dá)不到預(yù)期,這里$this->loop->addTimer(0.0,0.0, function (){});可以等同看作是 Event::defer,也就是說必須有一個(gè)基于 Timer::tick 的方法掛在最后,這樣的結(jié)果才符合預(yù)期;

我的猜測(cè)是在Swoole/OpenSwoole環(huán)境下,信號(hào)的發(fā)送和接收是會(huì)被轉(zhuǎn)為異步操作,交給了swoole的輔助線程的緣故,因?yàn)閟woole中還有一條輔助線程用來調(diào)度協(xié)程,也會(huì)包含一個(gè)loop,因?yàn)檫@個(gè)信號(hào)被轉(zhuǎn)為了異步,也就是說當(dāng)我的loop循環(huán)到第二圈的時(shí)候,信號(hào)還并沒有通知到我這里,我因?yàn)槭褂昧薲efer,在第二個(gè)循環(huán)中就將當(dāng)前循環(huán)destroy()了,自然達(dá)不到預(yù)期,但當(dāng)我使用了一個(gè)Timer::tick將我的loop掛起了不止一圈,那么這時(shí)候可能就收到了異步的信號(hào)通知,那么自然也就觸發(fā)了我的回調(diào),自然也就符合了預(yù)期。

還有一個(gè)問題,就是因?yàn)閟woole沒有無延遲的定時(shí)器提供,所以我使用了Event::defer來充當(dāng)這么一個(gè)定時(shí)器,但這里會(huì)存在一個(gè)問題,無延遲的定時(shí)器可能在生產(chǎn)環(huán)境中產(chǎn)生不同的業(yè)務(wù)邏輯,同時(shí)存在好幾個(gè)不同的無延遲定時(shí)器,但這里使用了Event::defer只能注冊(cè)一個(gè)回調(diào)方法,后注冊(cè)的會(huì)覆蓋先注冊(cè)的,所以需要慎用。

這個(gè)swoole/openswoole我目前還沒有測(cè)試完,之后測(cè)試了有其他心得的話,會(huì)繼續(xù)更新,畢竟如果能夠使用一些協(xié)程,也是比較快樂的事兒,繼續(xù)折騰!

這里繼續(xù)提一句,我的 workbunny/event-loop -1.0.0 是生產(chǎn)可用的,做好了測(cè)試覆蓋的;
隨后 1.x 我會(huì)加入 openswoole,當(dāng)然我得先把它測(cè)試通過才可以;

歡迎star!PR!issue!

?? 更新于2020-05-30

上述關(guān)于OpenSwoole相關(guān)的內(nèi)容我需要糾正以下,在我調(diào)整了測(cè)試用例后沒有復(fù)現(xiàn),這里經(jīng)過反復(fù)測(cè)試得出來了幾個(gè)結(jié)論:

  1. Event::tick 可以多次創(chuàng)建不會(huì)覆蓋,也就是不影響無延遲觸發(fā)器和無延遲定時(shí)器得多次創(chuàng)建

  2. 之前測(cè)試的信號(hào)和Timer及Event相關(guān)的猜測(cè)應(yīng)該是不存在的,是可以正常調(diào)用,出現(xiàn)之前所述的問題的原因是在測(cè)試用例時(shí)并沒有注意Swoole在一個(gè)循環(huán)周期內(nèi)的事件優(yōu)先級(jí),這個(gè)需要大家關(guān)注

更多詳細(xì)的測(cè)試內(nèi)容可以參看workbunny/event-loop,已經(jīng)發(fā)布1.1.1版本,增加了OpenSwoole的支持!

?? 更新于2020-06-02

關(guān)于event-loop的補(bǔ)充

描述補(bǔ)充

  1. 本質(zhì)上是一個(gè)大大的循環(huán),內(nèi)部有自己定義好的執(zhí)行順序及規(guī)律
  2. 通常會(huì)包含定時(shí)器、IO相關(guān)事件、一些數(shù)據(jù)結(jié)構(gòu)如環(huán)形緩沖區(qū)等

不同的event-loop在效率上的影響除了之前表達(dá)過的“含C量”之外,還包含了不同語言/不同數(shù)據(jù)結(jié)構(gòu)所帶來的一些時(shí)間復(fù)雜度的影響:

  1. workerman-eventloop(開啟event拓展下)

    1. 使用了event拓展的大循環(huán)
    2. 使用了event拓展的定時(shí)器、IO相關(guān)事件等
  2. reactphp-eventloop(開啟event拓展下)

    1. 使用了php自身的循環(huán)+event提供的loop once
    2. 在循環(huán)邏輯中加入了future特性的處理
    3. 使用event拓展的定時(shí)器、IO相關(guān)事件等

第一點(diǎn)的比較,體現(xiàn)了原生PHP和C在循環(huán)上的差異
reactphp的第二點(diǎn)使用了PHP-SPL的優(yōu)先隊(duì)列來儲(chǔ)存future回調(diào),優(yōu)先級(jí)隊(duì)列本質(zhì)上是堆,時(shí)間復(fù)雜度會(huì)在向優(yōu)先隊(duì)列插入數(shù)據(jù)時(shí)候體現(xiàn)

優(yōu)化方案

libevent、libev等C庫(kù)提供了Timer,支持傳入為0的delay參數(shù),也就相當(dāng)于在下一個(gè)循環(huán)周期內(nèi)立即執(zhí)行Timer注冊(cè)的回調(diào)函數(shù),使用Timer.delay=0替代future即可

其他

  1. future的特性其實(shí)很重要,主要可以實(shí)現(xiàn)一個(gè)異步的循環(huán),結(jié)合比如有棧協(xié)程fiber或者是隊(duì)列,可以輕松實(shí)現(xiàn)一些異步的內(nèi)容;
  2. ev在Timer的創(chuàng)建和使用上有比較特殊的地方,詳細(xì)可見workbunny/event-loop的測(cè)試部分bin/benchmark;
  3. workbunny/event-loop 可以當(dāng)一個(gè)教材來看,以便更快速了解workerman及其他PHP實(shí)現(xiàn)的event-loop;
    • 更新到了1.2.x,增加了許多測(cè)試用例,也包含一部分性能測(cè)試
    • 支持Swow
  4. 比較期待workerman 5.x
  5. 最近忙的不行不行的,很少時(shí)間摸魚了,閑了再繼續(xù)貢獻(xiàn)吧

?? 更新于2023-05-11

6666 7 13
7個(gè)評(píng)論

Tinywan

感謝分享!

  • chaz6chez 2022-05-30

    感覺你住在社區(qū)了,哈哈哈哈

xiuwang

看得正起勁呢,突然沒了,哈哈,期待續(xù)更...

chaz6chez

上述關(guān)于OpenSwoole相關(guān)的內(nèi)容我需要糾正一下,在我調(diào)整了測(cè)試用例后沒有復(fù)現(xiàn),這里經(jīng)過反復(fù)測(cè)試得出來了幾個(gè)結(jié)論:

  1. Event::tick 可以多次創(chuàng)建不會(huì)覆蓋,也就是不影響無延遲觸發(fā)器和無延遲定時(shí)器得多次創(chuàng)建

  2. 之前測(cè)試的信號(hào)和Timer及Event相關(guān)的猜測(cè)應(yīng)該是不存在的,是可以正常調(diào)用,出現(xiàn)之前所述的問題的原因是在測(cè)試用例時(shí)并沒有注意Swoole在一個(gè)循環(huán)周期內(nèi)的事件優(yōu)先級(jí),這個(gè)需要大家關(guān)注

更多詳細(xì)的測(cè)試內(nèi)容可以參看workbunny/event-loop,已經(jīng)發(fā)布1.1.1版本,增加了OpenSwoole的支持!

?? 更新于2020-06-02

evilk

mark

  • 暫無評(píng)論
ikun

感謝分享

  • 暫無評(píng)論
JackDx

感謝分享!

  • 暫無評(píng)論
liangshan

mark

  • 暫無評(píng)論
年代過于久遠(yuǎn),無法發(fā)表評(píng)論

chaz6chez

5174
積分
0
獲贊數(shù)
0
粉絲數(shù)
2018-11-16 加入
??