在gatewaywork的bussiness中使用redis-queue,onconnect時(shí)間中,給隊(duì)列發(fā)送數(shù)據(jù)
/**
* 當(dāng)客戶端連接時(shí)觸發(fā)
* 如果業(yè)務(wù)不需此回調(diào)可以刪除onConnect
*
* @param int $client_id 連接id
*/
public static function onConnect($client_id)
{
// 向當(dāng)前client_id發(fā)送數(shù)據(jù)
// Gateway::sendToClient($client_id, "當(dāng)前在線".Gateway::getAllClientIdCount()." \r\n");
// 向所有人發(fā)送
try {
$client = new Client('redis://127.0.0.1:6379');
$client->send('user-1', ['demo', 'data']);
echo "aaa",PHP_EOL;
} catch (\Throwable $th) {
print_r($th);
}
另外在start_queue.php中消費(fèi)
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://wtbis.cn/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
use \Workerman\Worker;
use \Workerman\Timer;
use \Workerman\WebServer;
use \GatewayWorker\Gateway;
use \GatewayWorker\BusinessWorker;
use Workerman\RedisQueue\Client;
use \Workerman\Autoloader;
// 自動(dòng)加載類
require_once __DIR__ . '/../../vendor/autoload.php';
// bussinessWorker 進(jìn)程
$worker = new Worker();
// worker名稱
$worker->name = 'queue';
// bussinessWorker進(jìn)程數(shù)量
$worker->count = 1;
// 服務(wù)注冊(cè)地址
// $worker->registerAddress = '127.0.0.1:1238';
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
// 訂閱
$client->subscribe('user-1', function($data){
echo "user-1\n";
// var_export($data);
file_put_contents('a.txt','bbb');
});
// $client->send('user-1', ['some', 'data']);
};
if(!defined('GLOBAL_START'))
{
Worker::runAll();
}
經(jīng)過測(cè)試,100次連接,只消費(fèi)不到一半,有的時(shí)候 只有10次不到。單獨(dú)在start_queue中用定時(shí)器,定時(shí)推送消費(fèi),就沒什么問題。而且,似乎還有粘包的問題,因?yàn)榘l(fā)現(xiàn)有幾行輸出是多個(gè)aaa連著的,沒有換行
不好意思,是我搞錯(cuò)了,我在application下直接執(zhí)行了一個(gè)消費(fèi)隊(duì)列進(jìn)程,導(dǎo)致在目錄下面stop的時(shí)候,沒結(jié)束這個(gè)消費(fèi)進(jìn)程