国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

期待大佬能基于rdkafka封裝一個webman能用擴展?

zeus

最近要和大數(shù)據(jù)部門合作,處理下他們的kafka-topic的數(shù)據(jù),打算用webman的自定義進程實現(xiàn),找了幾個包,

nmred/kafka-php // 這個包里面有amphp ,在start的時候會多啟動一個進程去消費,擔心在webman的自定義進程中沒法很好的管理, 所以打算放棄。
longlang/phpkafka // 這個包可在fpm和swoole下使用,但是還沒出正式版,放棄了。

目前是這兩個包 下載量比較大了。

最后還是基于rdkafka的文檔中例子編寫了三個類,例子文檔:https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka.examples.html ; 但是感覺不是很完美, 希望亮哥大大考慮下封裝一個擴展,在webman中使用。萬分感謝。

2864 4 1
4個回答

walkor 打賞

哪里不完美可以發(fā)出來大家討論哈。我目前在做workerman v5 和 webman v1.2相關(guān)的開發(fā),我一個人精力有限,對kafka并不熟悉,目前而言實在沒有精力從頭研究kafka。

  • zeus 2022-01-01

    嗯嗯,理解,我先用最簡單的方式實現(xiàn)了,過幾天發(fā)上來大家?guī)兔聪鹿8兄x。

  • tanhongbin 2022-01-06

    老大,希望在文檔加個模塊寫入每次更新內(nèi)容,以及升級命令就更好了,要不然每次都得去github上去找

Tinywan

什么叫擔心在webman的自定義進程中沒法很好的管理, 所以打算放棄。 請問你有嘗試去管理嗎?真嘗試了,遇到的問題又是什么?所有自己業(yè)務(wù)的擴展第一步的自己先嘗試,有問題大家一起解決。別動不動就讓作者你給寫個擴展

橘叔

先自己嘗試一下,閱讀一下包里面是否有靜態(tài)變量,全局變量,等操作,然后進行壓測看一下是否存在內(nèi)存泄露. 有修改提交pr 不就行了...

zeus
<?php

namespace App\Library\Kafka;

use RdKafka\Conf;
use RdKafka\Message;

/**
 * 高級消費者
 * Class HighLevelConsumer
 * @package App\Library\Kafka
 */
class HighLevelConsumer
{
    /**
     * @var \RdKafka\KafkaConsumer
     */
    private $consumer;

    /**
     * 消費開始的offset
     */
    const BEGIN = 'earliest';

    /**
     * 分組名
     */
    private $group_id;

    /**
     * 當前消費到的位置
     */
    private $offset;

    /**
     * 是否自動提交offset
     * @var int
     */
    private $auto_commit;

    /**
     * 自動提交offset時間毫秒
     * @var int
     */
    private $auto_commit_interval_ms;

    /**
     * Kafka服務(wù)器地址
     */
    private $metadata_broker_list;

    /**
     * 主題數(shù)組
     */
    private $topics;

    /**
     * 構(gòu)造函數(shù)
     * @param string $metadata_broker_list 服務(wù)器地址
     * @param string $group_id 分組名
     * @param array $topics 主題數(shù)組
     * @param string|null $offset 消費偏移量
     * @throws \RdKafka\Exception
     */
    public function __construct(string $metadata_broker_list, string $group_id, array $topics, int $auto_commit = 0,
                                int    $auto_commit_interval_ms = 5000, string $offset = null)
    {
        $this->metadata_broker_list = $metadata_broker_list;
        $this->group_id = $group_id;
        $this->topics = $topics;
        $this->auto_commit = $auto_commit;
        $this->auto_commit_interval_ms = $auto_commit_interval_ms;
        $this->offset = $offset ?? self::BEGIN;

        //初始化高級消費者
        $this->init();
    }

    /**
     * 初始化
     * @throws \RdKafka\Exception
     */
    private function init()
    {

        $conf = new Conf();

// Set a rebalance callback to log partition assignments (optional)
        $conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
            /**
             * @var \RdKafka\TopicPartition[] $partitions
             */
            $partitionStr = json_encode(array_map(function (\RdKafka\TopicPartition $partition) {
                return [
                    'topic' => $partition->getTopic(),
                    'partition' => $partition->getPartition(),
                    'offset' => $partition->getOffset(),
                ];
            }, $partitions));

            switch ($err) {
                case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                    info(__METHOD__ . ' rebalance assign callback:' . $partitionStr);
                    $kafka->assign($partitions);
                    break;

                case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                    info(__METHOD__ . ' rebalance revoke callback:' . $partitionStr);
                    $kafka->assign(NULL);
                    break;

                default:
                    error(__METHOD__ . ' rebalance unknown callback:' . $err);
                    throw new \Exception($err);
            }
        });

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
        $conf->set('group.id', $this->group_id);

// Initial list of Kafka brokers
        $conf->set('metadata.broker.list', $this->metadata_broker_list);

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'earliest': start from the beginning
        $conf->set('auto.offset.reset', $this->offset);

        //自動提交offset開關(guān) 和 自動提交時間間隔
        $conf->set('enable.auto.commit', $this->auto_commit);
        $conf->set('auto.commit.interval.ms', $this->auto_commit_interval_ms);

        $this->consumer = new \RdKafka\KafkaConsumer($conf);

// Subscribe to topic 'test'
        $this->consumer->subscribe($this->topics);
    }

    /**
     * 真實消費
     * @return Message|null
     * @throws \RdKafka\Exception
     */
    public function consume(): ?Message
    {
        $message = $this->consumer->consume(120 * 1000);
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                info(__METHOD__ . ' messages : ' . json_encode($message, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE));
                return $message;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                info(__METHOD__ . ' No more messages : ' . $message->errstr());
                return null;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                info(__METHOD__ . ' Timeout :' . $message->errstr() . ' code:' . $message->err . PHP_EOL);
                return null;
            default:
                error(__METHOD__ . ' Exception :' . $message->errstr() . ' code:' . $message->err . PHP_EOL);
                throw new \Exception(__METHOD__ . ' ' . $message->errstr(), $message->err);
        }
    }

    /**
     * 同步提交offset
     * @throws \RdKafka\Exception
     */
    public function commit(): void
    {
        $this->consumer->commit();
    }

    /**
     * 異步提交offset
     * @throws \RdKafka\Exception
     */
    public function commitAsync(): void
    {
        $this->consumer->commitAsync();
    }
}

此類基于rdKafka擴展實現(xiàn),大佬看下有啥問題沒?

年代過于久遠,無法發(fā)表回答
??