現(xiàn)有第三方服務(wù),實(shí)現(xiàn)了TCP服務(wù)端,稱為服務(wù)A
。基本用法就是通過創(chuàng)建TCP客戶端連接服務(wù)A
,對(duì)服務(wù)A
發(fā)送各種指令,獲得響應(yīng),以及訂閱服務(wù)A的各種事件,服務(wù)A會(huì)主動(dòng)推送事件通知過來。
我想用wokerman的做一個(gè)中間層,負(fù)責(zé)與服務(wù)A通信,并以wokerman為核心提供一些服務(wù),共內(nèi)部的其他程序使用。流程為本地其他項(xiàng)目————調(diào)用workerman中間層————調(diào)用服務(wù)A
,其中wokerman中間層
負(fù)責(zé)將服務(wù)A
的響應(yīng)結(jié)果返回給本地其他項(xiàng)目
我想知道wokerman中間層提供何種服務(wù)才能簡(jiǎn)便供本地其他項(xiàng)目調(diào)用?
已知與服務(wù)A進(jìn)行TCP交互,TCP服務(wù)端會(huì)響應(yīng)的數(shù)據(jù)格式為兩種,一種是同步命令型
,一種是異步訂閱型
。
同步命令型
:指的是TCP客戶端發(fā)出命令指令,TCP服務(wù)端會(huì)阻塞的返回指令的執(zhí)行結(jié)果,與http協(xié)議一樣 請(qǐng)求/響應(yīng)
異步訂閱型
:指的是TCP客戶端發(fā)出訂閱指令,TCP服務(wù)端會(huì)立即返回訂閱結(jié)果,并定時(shí)向TCP客戶端發(fā)送訂閱的數(shù)據(jù),直到客戶端收集了足夠的數(shù)據(jù),主動(dòng)取消訂閱或者斷開連接為止。
初步實(shí)現(xiàn)wokerman中間層的想法是:
1.中間層搭建一個(gè)http服務(wù)端,用來接收本地其他項(xiàng)目的url請(qǐng)求(指令)
2.中間層接收到url發(fā)送過來的指令后,判斷是否為同步命令型指令
,如果是,則建立TCP客戶端與服務(wù)A交互,并同步等待服務(wù)A的響應(yīng)結(jié)果,并將響應(yīng)結(jié)果作為response響應(yīng)給http客戶端;如果是異步訂閱型指令
,則要求http客戶端發(fā)送的參數(shù)中必須包含回調(diào)url,以便中間件接收到服務(wù)A訂閱數(shù)據(jù)后,通過該回調(diào)響應(yīng)給http客戶端。
3.為了解決workerman中間層
與服務(wù)A
交互時(shí)的TCP邊界問題,使用workerman的特性,定制了與服務(wù)A的通訊協(xié)議(封裝了解包/發(fā)包協(xié)議)定制協(xié)議,中間件與服務(wù)A交互的方式如下:
$con = new \Workerman\Connection\AsyncTcpConnection('TestNL://14.103.39.10:3315');
$con->onConnect = function ($con) {
// 發(fā)送驗(yàn)證
$con->send("xxxx");
};
$con->onMessage = function ($con, $data) {
// 獲取解包/合包后的服務(wù)A響應(yīng)的數(shù)據(jù)
var_data($data)
};
那么問題就出現(xiàn)了,我使用workerman的定制協(xié)議,必須要用AsyncTcpConnection建立TCP客戶端,而且我又想提供http服務(wù)端及時(shí)的把數(shù)據(jù)響應(yīng)給http客戶端,這在workerman或者webman中根本不允許的啊,AsyncTcpConnection是異步的,我不能阻塞http的worker進(jìn)程用來等待AsyncTcpConnection的message的響應(yīng)啊。請(qǐng)問應(yīng)該如何解決這種情況呢?
這種需求應(yīng)該很常見,請(qǐng)問大佬們?nèi)绾螌?shí)現(xiàn)呢?
問題描述得很詳細(xì),非常贊,一看就有回答的欲望。
workerman做這樣的業(yè)務(wù)非常擅長(zhǎng),以下是示例代碼
為了異步執(zhí)行http回調(diào),要裝下 workerman/http-client
composer require workerman/http-client
以下是服務(wù)端代碼示例 start.php
<?php
use Workerman\Connection\TcpConnection;
use Workerman\Http\Client;
use Workerman\Protocols\Http\Request;
use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker('http://0.0.0.0:2345');
// 保存訂閱主題->回調(diào)url的數(shù)組
global $subjects;
$subjects = [];
// 進(jìn)程啟動(dòng)時(shí)建立一個(gè)到A服務(wù)的連接
$worker->onWorkerStart = function() {
$con = new AsyncTcpConnection('ws://echo.websocket.org:443');
$con->transport = 'ssl'; // tcp協(xié)議不需要設(shè)置
$con->onWebSocketConnect = function(AsyncTcpConnection $con, ) {
$con->send('hello');
};
// 收到A服務(wù)的訂閱數(shù)據(jù)后轉(zhuǎn)發(fā)給訂閱者
$con->onMessage = function(AsyncTcpConnection $con, $subject) {
global $subjects;
foreach ($subjects[$subject] ?? [] as $callbackUrl) {
$httpClient = new Client();
$httpClient->post($callbackUrl, ['subject' => $subject]);
echo "callback $callbackUrl\n";
}
};
// 為了模擬A主動(dòng)推送訂閱消息,這里定時(shí)向A服務(wù)發(fā)送數(shù)據(jù),A會(huì)返回hello
\Workerman\Timer::add(10, function () use ($con) {
$con->send('hello');
});
$con->connect();
};
$worker->onMessage = function (TcpConnection $httpConnection, Request $request) {
$callbackUrl = $request->get('callback');
// 訂閱主題
$subject = $request->get('subject');
// 沒有回調(diào)url或者訂閱主題認(rèn)為是同步指令
if (!$callbackUrl || !$subject) {
$con = new AsyncTcpConnection('ws://echo.websocket.org:443');
$con->transport = 'ssl'; // tcp協(xié)議不需要設(shè)置
$con->onWebSocketConnect = function (AsyncTcpConnection $con) {
$con->send('hello');
};
$con->onMessage = function (AsyncTcpConnection $con, $data) use ($httpConnection) {
$httpConnection->send($data);
$con->close();
};
$con->connect();
return;
}
// 記錄主題和回調(diào)之間的關(guān)系
global $subjects;
if (!isset($subjects[$subject])) {
$subjects[$subject] = [];
}
if (in_array($callbackUrl, $subjects[$subject])) {
$httpConnection->send('already subscribed');
return;
}
$subjects[$subject][] = $callbackUrl;
$httpConnection->send('subscribed');
};
Worker::runAll();
php start.php start
同步請(qǐng)求url類似 http://127.0.0.1:2345
訂閱測(cè)試url類似 http://127.0.0.1:2345/?subject=hello&callback=<url地址>
為了方便測(cè)試,上面代碼用的wss協(xié)議測(cè)試,你需要改成自己的協(xié)議
感謝老大,思路太棒了,在worker進(jìn)程開啟的時(shí)候就建立一個(gè)tcp長(zhǎng)連接負(fù)責(zé)分發(fā)A服務(wù)的響應(yīng),然后http請(qǐng)求時(shí)使用短連接,查完就關(guān)閉連接。稍微改一下直接可以用了。接下來我ab壓測(cè)下單進(jìn)程的性能,以及是否可以多進(jìn)程,或者移植到webman上。
按我現(xiàn)在想的,應(yīng)該可以直接開啟多進(jìn)程增大并發(fā),因?yàn)槊總€(gè)進(jìn)程開啟的時(shí)候就建立tcp客戶端連接,然后http客戶端發(fā)送訂閱指令時(shí),無論subject分發(fā)到哪個(gè)worker進(jìn)程去A服務(wù)上訂閱都無所謂。
可能要改的是就是進(jìn)程開啟時(shí)的定時(shí)器,定時(shí)將存儲(chǔ)的訂閱指令發(fā)送到TCP服務(wù)端中
類似下面代碼
$worker->onWorkerStart = function() {
$con = new AsyncTcpConnection('ws://echo.websocket.org:443');
$con->transport = 'ssl'; // tcp協(xié)議不需要設(shè)置
$con->onWebSocketConnect = function(AsyncTcpConnection $con, ) {
$con->send('hello');
};
// 收到A服務(wù)的訂閱數(shù)據(jù)后轉(zhuǎn)發(fā)給訂閱者
$con->onMessage = function(AsyncTcpConnection $con, $subject) {
global $subjects;
foreach ($subjects[$subject] ?? [] as $callbackUrl) {
$httpClient = new Client();
$httpClient->post($callbackUrl, ['subject' => $subject]);
echo "callback $callbackUrl\n";
}
};
// 定時(shí)獲取全局變量中待發(fā)送的訂閱指令,發(fā)送后在onMessage中監(jiān)聽訂閱響應(yīng)
\Workerman\Timer::add(10, function () use ($con) {
global $subjects;
foreach ($subjects as $subject) {
$con->send('訂閱指令');
}
if (empty($subjects)) {
$con->send('發(fā)送心跳');
}
});
$con->connect();
};