最近要和大數(shù)據(jù)部門合作,處理下他們的kafka-topic的數(shù)據(jù),打算用webman的自定義進程實現(xiàn),找了幾個包,
nmred/kafka-php // 這個包里面有amphp ,在start的時候會多啟動一個進程去消費,擔心在webman的自定義進程中沒法很好的管理, 所以打算放棄。
longlang/phpkafka // 這個包可在fpm和swoole下使用,但是還沒出正式版,放棄了。
目前是這兩個包 下載量比較大了。
最后還是基于rdkafka的文檔中例子編寫了三個類,例子文檔:https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka.examples.html ; 但是感覺不是很完美, 希望亮哥大大考慮下封裝一個擴展,在webman中使用。萬分感謝。
<?php
namespace App\Library\Kafka;
use RdKafka\Conf;
use RdKafka\Message;
/**
* 高級消費者
* Class HighLevelConsumer
* @package App\Library\Kafka
*/
class HighLevelConsumer
{
/**
* @var \RdKafka\KafkaConsumer
*/
private $consumer;
/**
* 消費開始的offset
*/
const BEGIN = 'earliest';
/**
* 分組名
*/
private $group_id;
/**
* 當前消費到的位置
*/
private $offset;
/**
* 是否自動提交offset
* @var int
*/
private $auto_commit;
/**
* 自動提交offset時間毫秒
* @var int
*/
private $auto_commit_interval_ms;
/**
* Kafka服務(wù)器地址
*/
private $metadata_broker_list;
/**
* 主題數(shù)組
*/
private $topics;
/**
* 構(gòu)造函數(shù)
* @param string $metadata_broker_list 服務(wù)器地址
* @param string $group_id 分組名
* @param array $topics 主題數(shù)組
* @param string|null $offset 消費偏移量
* @throws \RdKafka\Exception
*/
public function __construct(string $metadata_broker_list, string $group_id, array $topics, int $auto_commit = 0,
int $auto_commit_interval_ms = 5000, string $offset = null)
{
$this->metadata_broker_list = $metadata_broker_list;
$this->group_id = $group_id;
$this->topics = $topics;
$this->auto_commit = $auto_commit;
$this->auto_commit_interval_ms = $auto_commit_interval_ms;
$this->offset = $offset ?? self::BEGIN;
//初始化高級消費者
$this->init();
}
/**
* 初始化
* @throws \RdKafka\Exception
*/
private function init()
{
$conf = new Conf();
// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
/**
* @var \RdKafka\TopicPartition[] $partitions
*/
$partitionStr = json_encode(array_map(function (\RdKafka\TopicPartition $partition) {
return [
'topic' => $partition->getTopic(),
'partition' => $partition->getPartition(),
'offset' => $partition->getOffset(),
];
}, $partitions));
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
info(__METHOD__ . ' rebalance assign callback:' . $partitionStr);
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
info(__METHOD__ . ' rebalance revoke callback:' . $partitionStr);
$kafka->assign(NULL);
break;
default:
error(__METHOD__ . ' rebalance unknown callback:' . $err);
throw new \Exception($err);
}
});
// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', $this->group_id);
// Initial list of Kafka brokers
$conf->set('metadata.broker.list', $this->metadata_broker_list);
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'earliest': start from the beginning
$conf->set('auto.offset.reset', $this->offset);
//自動提交offset開關(guān) 和 自動提交時間間隔
$conf->set('enable.auto.commit', $this->auto_commit);
$conf->set('auto.commit.interval.ms', $this->auto_commit_interval_ms);
$this->consumer = new \RdKafka\KafkaConsumer($conf);
// Subscribe to topic 'test'
$this->consumer->subscribe($this->topics);
}
/**
* 真實消費
* @return Message|null
* @throws \RdKafka\Exception
*/
public function consume(): ?Message
{
$message = $this->consumer->consume(120 * 1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
info(__METHOD__ . ' messages : ' . json_encode($message, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE));
return $message;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
info(__METHOD__ . ' No more messages : ' . $message->errstr());
return null;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
info(__METHOD__ . ' Timeout :' . $message->errstr() . ' code:' . $message->err . PHP_EOL);
return null;
default:
error(__METHOD__ . ' Exception :' . $message->errstr() . ' code:' . $message->err . PHP_EOL);
throw new \Exception(__METHOD__ . ' ' . $message->errstr(), $message->err);
}
}
/**
* 同步提交offset
* @throws \RdKafka\Exception
*/
public function commit(): void
{
$this->consumer->commit();
}
/**
* 異步提交offset
* @throws \RdKafka\Exception
*/
public function commitAsync(): void
{
$this->consumer->commitAsync();
}
}
此類基于rdKafka擴展實現(xiàn),大佬看下有啥問題沒?