国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

在webman里如何在worker啟動(dòng)時(shí)全局初始化rabbitmq連接對(duì)象

cqqjj1029

寫(xiě)了一個(gè)rabbitmq的工具類(lèi),下面貼代碼,需要生產(chǎn)消息時(shí)直接

\util\Rabbitmq\publishWorkerQueue($queueName, $msg);

需要消費(fèi)消息時(shí):

\util\Rabbitmq\consumeWorkerQueue($queueName, $callback);

我一直有個(gè)疑問(wèn),這個(gè)rabbit的connection對(duì)象,應(yīng)該是在worker啟動(dòng)時(shí)就創(chuàng)建好,然后在需要的地方直接調(diào)用就行,否則像現(xiàn)在這樣,每生產(chǎn)一次要建立一次連接再銷(xiāo)毀,應(yīng)該會(huì)浪費(fèi)資源吧。
但是我想不明白該在什么地方怎么寫(xiě)這個(gè)全局建立連接對(duì)象的方法,對(duì)于logger對(duì)象也是同樣的疑問(wèn),希望可以得到指點(diǎn)。

具體代碼如下:

<?php

namespace util;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use support\Log;

class Rabbitmq
{
    protected static function Connection()
    {
        $connection = new AMQPStreamConnection(
            env('RABBITMQ_HOST', '127.0.0.1'),
            env('RABBITMQ_PORT', 5672),
            env('RABBITMQ_USER', 'guest'),
            env('RABBITMQ_PASSWORD', 'guest')
        );
        return $connection;
    }

    /**
     * 發(fā)布消息到worker隊(duì)列,支持一次性發(fā)布多個(gè)消息
     *
     * @author Aaron <chenqiang@h024.cn>
     *
     * @param string $queueName     隊(duì)列名稱(chēng)
     * @param string|array $msgData 需要入隊(duì)的消息,單一消息為字符串類(lèi)型,多個(gè)消息是數(shù)組類(lèi)型
     */
    public static function publishWorkerQueue(string $queueName = '', $msgData)
    {
        $log = Log::channel('producer');
        if ($queueName == '') {
            $queueName = env('RABBITMQ_DEFAULT_QUEUE', 'default');
        }
        $connection = self::Connection();
        $channel = $connection->channel();
        $channel->queue_declare($queueName, false, true, false, false);
        if (!is_array($msgData)) {
            $msgData = array($msgData);
        }
        // 遍歷數(shù)組,對(duì)每一個(gè)元素做入隊(duì)操作
        foreach ($msgData as $dataBody) {
            // 把字符串類(lèi)型的元素入隊(duì),忽略其他類(lèi)型的元素
            try {
                $dataBody = (string)$dataBody;
            } catch (\Exception $e) {
                $dataBody = json_encode($dataBody);
            }
            $msg = new AMQPMessage(
                $dataBody,
                array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
            );
            $log->debug("{$queueName}入隊(duì)數(shù)據(jù): {$dataBody}");
            $channel->basic_publish($msg, '', $queueName);
        }
        $channel->close();
        $connection->close();
    }

    /**
     * 從一個(gè)worker隊(duì)列里消費(fèi)一條記錄
     *
     * @author Aaron <chenqiang@h024.cn>
     *
     * @param string $queueName     隊(duì)列名稱(chēng)
     * @param callable $callback    消費(fèi)的回調(diào)函數(shù),接收值$msg了隊(duì)列中的一條消息
     * @param bool $autoAck         自動(dòng)確認(rèn)消費(fèi),默認(rèn)為false,需要在消費(fèi)回調(diào)里手動(dòng)執(zhí)行$msg->ack()做消費(fèi)成功確認(rèn)
     */
    public static function consumeWorkerQueue(string $queueName, callable $callback, bool $autoAck = false)
    {
        $log = Log::channel('consumer');
        if ($queueName == '') {
            $queueName = env('RABBITMQ_DEFAULT_QUEUE', 'default');
        }
        $connection = self::Connection();
        $channel = $connection->channel();
        $channel->queue_declare($queueName, false, true, false, false); // 默認(rèn)自動(dòng)確認(rèn)
        $channel->basic_qos(null, 1, null); // 一次只消費(fèi)一條
        $channel->basic_consume($queueName, '', false, $autoAck, false, false, $callback);
        while ($channel->is_open()) {
            $channel->wait();
            pcntl_signal_dispatch();    // 針對(duì)exit with status 9
        }
    }
}
2156 2 0
2個(gè)回答

chaz6chez

進(jìn)程與進(jìn)程之間是資源阻隔的,在當(dāng)前進(jìn)程里,global或靜態(tài)變量都可以承載這個(gè)連接資源,如果你想要全局的,所有進(jìn)程共享,那不行,除非用線程

  • 暫無(wú)評(píng)論
2548a

文檔里有寫(xiě),地址: http://wtbis.cn/doc/webman/others/bootstrap.html
自己注意,初始化的是當(dāng)前進(jìn)程有效的.

  • cqqjj1029 2022-05-07

    這個(gè)文檔我看了,同時(shí)也參考了support\bootstrap\Session和support\bootstrap\LaravelDb,但還是沒(méi)想明白具體我該怎么寫(xiě),比如我創(chuàng)建app/bootstrap/Rabbitmq.php文件,在它的start文件法寫(xiě)了$connection = new AMQPStreamConnection(...),然后呢,怎么能讓process中的RabbitConsumer.php能夠訪問(wèn)到這個(gè)$connection呢?

  • chaz6chez 2022-05-07

    數(shù)據(jù)庫(kù)連接是怎么實(shí)現(xiàn)進(jìn)程內(nèi)單例的,你就可以怎么實(shí)現(xiàn),最簡(jiǎn)單的方式就是class中用static變量保存new AMQPStreamConnection(...),然后下次用的時(shí)候都用這個(gè)static變量

年代過(guò)于久遠(yuǎn),無(wú)法發(fā)表回答
??