国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

期待大佬能基于rdkafka封裝一個(gè)webman能用擴(kuò)展?

zeus

最近要和大數(shù)據(jù)部門(mén)合作,處理下他們的kafka-topic的數(shù)據(jù),打算用webman的自定義進(jìn)程實(shí)現(xiàn),找了幾個(gè)包,

nmred/kafka-php // 這個(gè)包里面有amphp ,在start的時(shí)候會(huì)多啟動(dòng)一個(gè)進(jìn)程去消費(fèi),擔(dān)心在webman的自定義進(jìn)程中沒(méi)法很好的管理, 所以打算放棄。
longlang/phpkafka // 這個(gè)包可在fpm和swoole下使用,但是還沒(méi)出正式版,放棄了。

目前是這兩個(gè)包 下載量比較大了。

最后還是基于rdkafka的文檔中例子編寫(xiě)了三個(gè)類,例子文檔:https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka.examples.html ; 但是感覺(jué)不是很完美, 希望亮哥大大考慮下封裝一個(gè)擴(kuò)展,在webman中使用。萬(wàn)分感謝。

2963 4 1
4個(gè)回答

walkor 打賞

哪里不完美可以發(fā)出來(lái)大家討論哈。我目前在做workerman v5 和 webman v1.2相關(guān)的開(kāi)發(fā),我一個(gè)人精力有限,對(duì)kafka并不熟悉,目前而言實(shí)在沒(méi)有精力從頭研究kafka。

  • zeus 2022-01-01

    嗯嗯,理解,我先用最簡(jiǎn)單的方式實(shí)現(xiàn)了,過(guò)幾天發(fā)上來(lái)大家?guī)兔聪鹿8兄x。

  • tanhongbin 2022-01-06

    老大,希望在文檔加個(gè)模塊寫(xiě)入每次更新內(nèi)容,以及升級(jí)命令就更好了,要不然每次都得去github上去找

Tinywan

什么叫擔(dān)心在webman的自定義進(jìn)程中沒(méi)法很好的管理, 所以打算放棄。 請(qǐng)問(wèn)你有嘗試去管理嗎?真嘗試了,遇到的問(wèn)題又是什么?所有自己業(yè)務(wù)的擴(kuò)展第一步的自己先嘗試,有問(wèn)題大家一起解決。別動(dòng)不動(dòng)就讓作者你給寫(xiě)個(gè)擴(kuò)展

橘叔

先自己嘗試一下,閱讀一下包里面是否有靜態(tài)變量,全局變量,等操作,然后進(jìn)行壓測(cè)看一下是否存在內(nèi)存泄露. 有修改提交pr 不就行了...

zeus
<?php

namespace App\Library\Kafka;

use RdKafka\Conf;
use RdKafka\Message;

/**
 * 高級(jí)消費(fèi)者
 * Class HighLevelConsumer
 * @package App\Library\Kafka
 */
class HighLevelConsumer
{
    /**
     * @var \RdKafka\KafkaConsumer
     */
    private $consumer;

    /**
     * 消費(fèi)開(kāi)始的offset
     */
    const BEGIN = 'earliest';

    /**
     * 分組名
     */
    private $group_id;

    /**
     * 當(dāng)前消費(fèi)到的位置
     */
    private $offset;

    /**
     * 是否自動(dòng)提交offset
     * @var int
     */
    private $auto_commit;

    /**
     * 自動(dòng)提交offset時(shí)間毫秒
     * @var int
     */
    private $auto_commit_interval_ms;

    /**
     * Kafka服務(wù)器地址
     */
    private $metadata_broker_list;

    /**
     * 主題數(shù)組
     */
    private $topics;

    /**
     * 構(gòu)造函數(shù)
     * @param string $metadata_broker_list 服務(wù)器地址
     * @param string $group_id 分組名
     * @param array $topics 主題數(shù)組
     * @param string|null $offset 消費(fèi)偏移量
     * @throws \RdKafka\Exception
     */
    public function __construct(string $metadata_broker_list, string $group_id, array $topics, int $auto_commit = 0,
                                int    $auto_commit_interval_ms = 5000, string $offset = null)
    {
        $this->metadata_broker_list = $metadata_broker_list;
        $this->group_id = $group_id;
        $this->topics = $topics;
        $this->auto_commit = $auto_commit;
        $this->auto_commit_interval_ms = $auto_commit_interval_ms;
        $this->offset = $offset ?? self::BEGIN;

        //初始化高級(jí)消費(fèi)者
        $this->init();
    }

    /**
     * 初始化
     * @throws \RdKafka\Exception
     */
    private function init()
    {

        $conf = new Conf();

// Set a rebalance callback to log partition assignments (optional)
        $conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
            /**
             * @var \RdKafka\TopicPartition[] $partitions
             */
            $partitionStr = json_encode(array_map(function (\RdKafka\TopicPartition $partition) {
                return [
                    'topic' => $partition->getTopic(),
                    'partition' => $partition->getPartition(),
                    'offset' => $partition->getOffset(),
                ];
            }, $partitions));

            switch ($err) {
                case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                    info(__METHOD__ . ' rebalance assign callback:' . $partitionStr);
                    $kafka->assign($partitions);
                    break;

                case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                    info(__METHOD__ . ' rebalance revoke callback:' . $partitionStr);
                    $kafka->assign(NULL);
                    break;

                default:
                    error(__METHOD__ . ' rebalance unknown callback:' . $err);
                    throw new \Exception($err);
            }
        });

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
        $conf->set('group.id', $this->group_id);

// Initial list of Kafka brokers
        $conf->set('metadata.broker.list', $this->metadata_broker_list);

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'earliest': start from the beginning
        $conf->set('auto.offset.reset', $this->offset);

        //自動(dòng)提交offset開(kāi)關(guān) 和 自動(dòng)提交時(shí)間間隔
        $conf->set('enable.auto.commit', $this->auto_commit);
        $conf->set('auto.commit.interval.ms', $this->auto_commit_interval_ms);

        $this->consumer = new \RdKafka\KafkaConsumer($conf);

// Subscribe to topic 'test'
        $this->consumer->subscribe($this->topics);
    }

    /**
     * 真實(shí)消費(fèi)
     * @return Message|null
     * @throws \RdKafka\Exception
     */
    public function consume(): ?Message
    {
        $message = $this->consumer->consume(120 * 1000);
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                info(__METHOD__ . ' messages : ' . json_encode($message, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE));
                return $message;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                info(__METHOD__ . ' No more messages : ' . $message->errstr());
                return null;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                info(__METHOD__ . ' Timeout :' . $message->errstr() . ' code:' . $message->err . PHP_EOL);
                return null;
            default:
                error(__METHOD__ . ' Exception :' . $message->errstr() . ' code:' . $message->err . PHP_EOL);
                throw new \Exception(__METHOD__ . ' ' . $message->errstr(), $message->err);
        }
    }

    /**
     * 同步提交offset
     * @throws \RdKafka\Exception
     */
    public function commit(): void
    {
        $this->consumer->commit();
    }

    /**
     * 異步提交offset
     * @throws \RdKafka\Exception
     */
    public function commitAsync(): void
    {
        $this->consumer->commitAsync();
    }
}

此類基于rdKafka擴(kuò)展實(shí)現(xiàn),大佬看下有啥問(wèn)題沒(méi)?

年代過(guò)于久遠(yuǎn),無(wú)法發(fā)表回答
??