根據(jù)觀法提供的workerman官方客戶端在webman中執(zhí)行命令:composer require workerman/mqtt
然后執(zhí)行執(zhí)行客戶端接收mqtt消息是可以的,
但是后面開始編寫業(yè)務(wù)的時(shí)候,如下面代碼:
Log::info($topic);
Log::info($content);
就會報(bào)錯:
TypeError: Argument 1 passed to support\Log::handlers() must be of the type array, null given, called in D:\develop\php\webman_admin\vendor\workerman\webman-framework\src\support\Log.php
on line 55 and defined in D:\develop\php\webman_admin\vendor\workerman\webman-framework\src\support\Log.php:67
下面是完整代碼:
啟動代碼:
declare(strict_types=1);
use Workerman\Worker;
use \app\mqttService\MqttService;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function(){
$options = [
'username' => 'admin',
'password' => '123456',
];
$mqtt = new Workerman\Mqtt\Client('mqtt://192.168.1.5:1883',$options);
$mqtt->onConnect = function($mqtt) {
$mqtt->subscribe('mqtt/face/1400754/Snap');
};
$mqttService = new MqttService();
$mqtt->onMessage = function($topic, $content) use ($mqttService){
$mqttService->handlerMessage($topic,$content);
//var_dump($topic, $content);
};
$mqtt->connect();
};
Worker::runAll();
業(yè)務(wù)代碼, $mqttService->handlerMessage里的邏輯:
public function handlerMessage($topic,$content) {
if (!empty($topic)) {
//echo "topic:".$topic.'\r\n';
Log::info($topic);
Log::info($content);
}
}
完整異常trace:
TypeError: Argument 1 passed to support\Log::handlers() must be of the type array, null given, called in D:\develop\php\webman_admin\vendor\workerman\webman-framework\src\support\Log.php
on line 55 and defined in D:\develop\php\webman_admin\vendor\workerman\webman-framework\src\support\Log.php:67
Stack trace:
#0 D:\develop\php\webman_admin\vendor\workerman\webman-framework\src\support\Log.php(55): support\Log::handlers(NULL)
#1 D:\develop\php\webman_admin\vendor\workerman\webman-framework\src\support\Log.php(139): support\Log::channel()
#2 D:\develop\php\webman_admin\app\mqttService\MqttService.php(26): support\Log::__callStatic('info', Array)
#3 D:\develop\php\webman_admin\mqtt_subscribe.php(21): app\mqttService\MqttService->handlerMessage('mqtt/face/basic', '{\r\n"operator": ...')
#4 [internal function]: {closure}('mqtt/face/basic', '{\r\n"operator": ...', Object(Workerman\Mqtt\Client))
#5 D:\develop\php\webman_admin\vendor\workerman\mqtt\src\Client.php(599): call_user_func(Object(Closure), 'mqtt/face/basic', '{\r\n"operator": ...', Object(Workerman\Mqtt\Client))
#6 D:\develop\php\webman_admin\vendor\workerman\workerman\Connection\TcpConnection.php(646): Workerman\Mqtt\Client->onConnectionMessage(Object(Workerman\Connection\AsyncTcpConnection), A
rray)
#7 D:\develop\php\webman_admin\vendor\workerman\workerman\Events\Select.php(311): Workerman\Connection\TcpConnection->baseRead(Resource id #42)
#8 D:\develop\php\webman_admin\vendor\workerman\workerman\Worker.php(1479): Workerman\Events\Select->loop()
#9 D:\develop\php\webman_admin\vendor\workerman\workerman\Worker.php(1399): Workerman\Worker::forkWorkersForWindows()
#10 D:\develop\php\webman_admin\vendor\workerman\workerman\Worker.php(560): Workerman\Worker::forkWorkers()
#11 D:\develop\php\webman_admin\mqtt_subscribe.php(27): Workerman\Worker::runAll()
#12 {main}
我的目的就是想用workerman/mqtt消費(fèi)消息,然后使用webman的數(shù)據(jù)庫,redis等組件寫業(yè)務(wù)
webman啟動時(shí)會讀取配置初始化webman運(yùn)行環(huán)境,初始化support\Log。但是你的mqtt_subscribe.php是一個單獨(dú)腳本,不是webman的一部分,缺少這些初始化,所以無法使用support\Log。
你可以參考webman自定義進(jìn)程部分,將你的mqtt業(yè)務(wù)放到webman自定義進(jìn)程中運(yùn)行,這樣就能用webman的運(yùn)行環(huán)境了。
已經(jīng)成功了,但是有個問題,就是會報(bào)一個錯: Uncaught Exception: class \Protocols\Mqtt not exist,我看了也確實(shí)沒有這個類:我直接在onWorkerStart初始化,但是就不會onConnect和onMessage了,很尷尬,還是需要寫進(jìn)協(xié)議里:
class MqttClient
{
private ?Client $mqtt = null;
public function onWorkerStart()
{
Log::info("啟動了:");
if (empty($this->mqtt)) {
$options = [
'username' => 'admin',
'password' => '123456',
];
$this->mqtt = new Client('mqtt://192.168.1.5:1883',$options);
$this->mqtt->subscribe('mqtt/test/1400754/Snap');
Log::info("執(zhí)行初始化了:");
}
Log::info("啟動了1:");
}
public function onConnect(TcpConnection $connection)
{
// $options = [
// 'username' => 'admin',
// 'password' => '123456',
// ];
// $this->mqtt = new Client('mqtt://192.168.1.5:1883',$options);
// $this->mqtt->subscribe('mqtt/face/1400754/Snap');
Log::info("連上mqtt了");
}
public function onMessage(TcpConnection $connection, $data)
{
Log::info("接收數(shù)據(jù):".$data);
echo $connection->id.'\r\n';
echo $data.'\r\n';
// echo "消息來了\r\n";
// $connection->send($data);
// $mqttHandler = new MqttHandler();
// $mqttHandler->handlerMessage($topic,$content);
}
public function onClose(TcpConnection $connection)
{
echo "onClose\n";
}
}
你看下是不是用錯客戶端了,用Workerman\Mqtt\Client。
MqttClient作為服務(wù)端有監(jiān)聽端口時(shí),有客戶端連接監(jiān)聽的端口請求時(shí)才會觸發(fā)MqttClient的onConnect和onMessage,否則不會觸發(fā)。
用上了,現(xiàn)在連上了,不過有個問題,就是不會觸發(fā)onConnet,onMessage
<?php
namespace process;
use support\Log;
use Workerman\Connection\TcpConnection;
use Workerman\Mqtt\Client;
class MqttClient
{
private ?Client $mqtt = null;
public function onWorkerStart()
{
Log::info("啟動了:");
if (empty($this->mqtt)) {
$options = [
'username' => 'admin',
'password' => '123456',
];
$this->mqtt = new Client('mqtt://192.168.1.5:1883',$options);
$this->mqtt->connect();
Log::info("執(zhí)行初始化了:"); // 這里執(zhí)行了
}
Log::info("啟動了1:");
}
public function onConnect(TcpConnection $connection)
{
echo $connection->id.'\r\n';
// $options = [
// 'username' => 'admin',
// 'password' => '123456',
// ];
// $this->mqtt = new Client('mqtt://192.168.1.5:1883',$options);
// $this->mqtt->subscribe('mqtt/face/1400754/Snap');
$this->mqtt->subscribe('mqtt/test/1400754/Snap');
Log::info("連上mqtt了"); // 這里沒有執(zhí)行
}
public function onMessage(TcpConnection $connection, $data)
{
Log::info("接收數(shù)據(jù):".$data); // 這里也沒有執(zhí)行
echo $connection->id.'\r\n';
echo $data.'\r\n';
// echo "消息來了\r\n";
// $connection->send($data);
// $mqttHandler = new MqttHandler();
// $mqttHandler->handlerMessage($topic,$content);
}
public function onClose(TcpConnection $connection)
{
$this->mqtt->close();
$this->mqtt = null;
echo "onClose\n";
}
}
之前單獨(dú)使用Workerman\Mqtt\Client這個組件是可以接收連接和消息的
服務(wù)器也接收到消息請求消息了
{"type":1,"protocol_name":"MQTT","protocol_level":4,"clean_session":1,"user_name":"admin","password":"123456","keep_alive":50,"client_id":"workerman-mqtt-client-649296651"}
就是后續(xù)消息沒有接收到,很奇怪
<?php
namespace app\process;
class MqttClient
{
public function onWorkerStart()
{
$mqtt = new \Workerman\Mqtt\Client('mqtt://broker.emqx.io:1883', array(
'debug' => true,
"username"=>"rw", "password"=>"readwrite"
));
$mqtt->onConnect = function($mqtt) {
$mqtt->subscribe('workerman');
};
$mqtt->onMessage = function($topic, $content){
echo "topic:$topic content:$content\n";
};
\Workerman\Timer::add(2, function() use ($mqtt) {
$mqtt->publish('workerman', 'hello workerman mqtt');
});
$mqtt->connect();
}
}
類似設(shè)置$mqtt->onConnect = function(){};
,$mqtt客戶端不會出發(fā)你的public onConnect(){}
,因?yàn)?code>public onConnect(){}是作為服務(wù)端才能觸發(fā)的。onMessage也一樣,別搞混了