寫(xiě)了一個(gè)rabbitmq的工具類(lèi),下面貼代碼,需要生產(chǎn)消息時(shí)直接
\util\Rabbitmq\publishWorkerQueue($queueName, $msg);
需要消費(fèi)消息時(shí):
\util\Rabbitmq\consumeWorkerQueue($queueName, $callback);
我一直有個(gè)疑問(wèn),這個(gè)rabbit的connection對(duì)象,應(yīng)該是在worker啟動(dòng)時(shí)就創(chuàng)建好,然后在需要的地方直接調(diào)用就行,否則像現(xiàn)在這樣,每生產(chǎn)一次要建立一次連接再銷(xiāo)毀,應(yīng)該會(huì)浪費(fèi)資源吧。
但是我想不明白該在什么地方怎么寫(xiě)這個(gè)全局建立連接對(duì)象的方法,對(duì)于logger對(duì)象也是同樣的疑問(wèn),希望可以得到指點(diǎn)。
具體代碼如下:
<?php
namespace util;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use support\Log;
class Rabbitmq
{
protected static function Connection()
{
$connection = new AMQPStreamConnection(
env('RABBITMQ_HOST', '127.0.0.1'),
env('RABBITMQ_PORT', 5672),
env('RABBITMQ_USER', 'guest'),
env('RABBITMQ_PASSWORD', 'guest')
);
return $connection;
}
/**
* 發(fā)布消息到worker隊(duì)列,支持一次性發(fā)布多個(gè)消息
*
* @author Aaron <chenqiang@h024.cn>
*
* @param string $queueName 隊(duì)列名稱(chēng)
* @param string|array $msgData 需要入隊(duì)的消息,單一消息為字符串類(lèi)型,多個(gè)消息是數(shù)組類(lèi)型
*/
public static function publishWorkerQueue(string $queueName = '', $msgData)
{
$log = Log::channel('producer');
if ($queueName == '') {
$queueName = env('RABBITMQ_DEFAULT_QUEUE', 'default');
}
$connection = self::Connection();
$channel = $connection->channel();
$channel->queue_declare($queueName, false, true, false, false);
if (!is_array($msgData)) {
$msgData = array($msgData);
}
// 遍歷數(shù)組,對(duì)每一個(gè)元素做入隊(duì)操作
foreach ($msgData as $dataBody) {
// 把字符串類(lèi)型的元素入隊(duì),忽略其他類(lèi)型的元素
try {
$dataBody = (string)$dataBody;
} catch (\Exception $e) {
$dataBody = json_encode($dataBody);
}
$msg = new AMQPMessage(
$dataBody,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$log->debug("{$queueName}入隊(duì)數(shù)據(jù): {$dataBody}");
$channel->basic_publish($msg, '', $queueName);
}
$channel->close();
$connection->close();
}
/**
* 從一個(gè)worker隊(duì)列里消費(fèi)一條記錄
*
* @author Aaron <chenqiang@h024.cn>
*
* @param string $queueName 隊(duì)列名稱(chēng)
* @param callable $callback 消費(fèi)的回調(diào)函數(shù),接收值$msg了隊(duì)列中的一條消息
* @param bool $autoAck 自動(dòng)確認(rèn)消費(fèi),默認(rèn)為false,需要在消費(fèi)回調(diào)里手動(dòng)執(zhí)行$msg->ack()做消費(fèi)成功確認(rèn)
*/
public static function consumeWorkerQueue(string $queueName, callable $callback, bool $autoAck = false)
{
$log = Log::channel('consumer');
if ($queueName == '') {
$queueName = env('RABBITMQ_DEFAULT_QUEUE', 'default');
}
$connection = self::Connection();
$channel = $connection->channel();
$channel->queue_declare($queueName, false, true, false, false); // 默認(rèn)自動(dòng)確認(rèn)
$channel->basic_qos(null, 1, null); // 一次只消費(fèi)一條
$channel->basic_consume($queueName, '', false, $autoAck, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
pcntl_signal_dispatch(); // 針對(duì)exit with status 9
}
}
}
進(jìn)程與進(jìn)程之間是資源阻隔的,在當(dāng)前進(jìn)程里,global或靜態(tài)變量都可以承載這個(gè)連接資源,如果你想要全局的,所有進(jìn)程共享,那不行,除非用線程
文檔里有寫(xiě),地址: http://wtbis.cn/doc/webman/others/bootstrap.html
自己注意,初始化的是當(dāng)前進(jìn)程有效的.
這個(gè)文檔我看了,同時(shí)也參考了support\bootstrap\Session和support\bootstrap\LaravelDb,但還是沒(méi)想明白具體我該怎么寫(xiě),比如我創(chuàng)建app/bootstrap/Rabbitmq.php文件,在它的start文件法寫(xiě)了$connection = new AMQPStreamConnection(...),然后呢,怎么能讓process中的RabbitConsumer.php能夠訪問(wèn)到這個(gè)$connection呢?
數(shù)據(jù)庫(kù)連接是怎么實(shí)現(xiàn)進(jìn)程內(nèi)單例的,你就可以怎么實(shí)現(xiàn),最簡(jiǎn)單的方式就是class中用static變量保存new AMQPStreamConnection(...),然后下次用的時(shí)候都用這個(gè)static變量