国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

workerman實(shí)現(xiàn)tcp客戶端中間件,該用什么樣的方法提供服務(wù)?

xiaopi

問題描述

現(xiàn)有第三方服務(wù),實(shí)現(xiàn)了TCP服務(wù)端,稱為服務(wù)A。基本用法就是通過創(chuàng)建TCP客戶端連接服務(wù)A,對(duì)服務(wù)A發(fā)送各種指令,獲得響應(yīng),以及訂閱服務(wù)A的各種事件,服務(wù)A會(huì)主動(dòng)推送事件通知過來。
我想用wokerman的做一個(gè)中間層,負(fù)責(zé)與服務(wù)A通信,并以wokerman為核心提供一些服務(wù),共內(nèi)部的其他程序使用。流程為本地其他項(xiàng)目————調(diào)用workerman中間層————調(diào)用服務(wù)A,其中wokerman中間層負(fù)責(zé)將服務(wù)A的響應(yīng)結(jié)果返回給本地其他項(xiàng)目

我想知道wokerman中間層提供何種服務(wù)才能簡(jiǎn)便供本地其他項(xiàng)目調(diào)用?
已知與服務(wù)A進(jìn)行TCP交互,TCP服務(wù)端會(huì)響應(yīng)的數(shù)據(jù)格式為兩種,一種是同步命令型,一種是異步訂閱型。
同步命令型:指的是TCP客戶端發(fā)出命令指令,TCP服務(wù)端會(huì)阻塞的返回指令的執(zhí)行結(jié)果,與http協(xié)議一樣 請(qǐng)求/響應(yīng)
異步訂閱型:指的是TCP客戶端發(fā)出訂閱指令,TCP服務(wù)端會(huì)立即返回訂閱結(jié)果,并定時(shí)向TCP客戶端發(fā)送訂閱的數(shù)據(jù),直到客戶端收集了足夠的數(shù)據(jù),主動(dòng)取消訂閱或者斷開連接為止。

初步實(shí)現(xiàn)wokerman中間層的想法是:
1.中間層搭建一個(gè)http服務(wù)端,用來接收本地其他項(xiàng)目的url請(qǐng)求(指令)
2.中間層接收到url發(fā)送過來的指令后,判斷是否為同步命令型指令,如果是,則建立TCP客戶端與服務(wù)A交互,并同步等待服務(wù)A的響應(yīng)結(jié)果,并將響應(yīng)結(jié)果作為response響應(yīng)給http客戶端;如果是異步訂閱型指令,則要求http客戶端發(fā)送的參數(shù)中必須包含回調(diào)url,以便中間件接收到服務(wù)A訂閱數(shù)據(jù)后,通過該回調(diào)響應(yīng)給http客戶端。
3.為了解決workerman中間層服務(wù)A交互時(shí)的TCP邊界問題,使用workerman的特性,定制了與服務(wù)A的通訊協(xié)議(封裝了解包/發(fā)包協(xié)議)定制協(xié)議,中間件與服務(wù)A交互的方式如下:

$con = new \Workerman\Connection\AsyncTcpConnection('TestNL://14.103.39.10:3315');
    $con->onConnect = function ($con) {
        // 發(fā)送驗(yàn)證
        $con->send("xxxx");
    };
    $con->onMessage = function ($con, $data) {
        // 獲取解包/合包后的服務(wù)A響應(yīng)的數(shù)據(jù)
        var_data($data)
    };

那么問題就出現(xiàn)了,我使用workerman的定制協(xié)議,必須要用AsyncTcpConnection建立TCP客戶端,而且我又想提供http服務(wù)端及時(shí)的把數(shù)據(jù)響應(yīng)給http客戶端,這在workerman或者webman中根本不允許的啊,AsyncTcpConnection是異步的,我不能阻塞http的worker進(jìn)程用來等待AsyncTcpConnection的message的響應(yīng)啊。請(qǐng)問應(yīng)該如何解決這種情況呢?

為此你搜索到了哪些方案及不適用的原因

這種需求應(yīng)該很常見,請(qǐng)問大佬們?nèi)绾螌?shí)現(xiàn)呢?

1047 3 4
3個(gè)回答

walkor 打賞

問題描述得很詳細(xì),非常贊,一看就有回答的欲望。
workerman做這樣的業(yè)務(wù)非常擅長(zhǎng),以下是示例代碼

安裝 workerman/http-client

為了異步執(zhí)行http回調(diào),要裝下 workerman/http-client

composer require workerman/http-client

服務(wù)端代碼

以下是服務(wù)端代碼示例 start.php

<?php

use Workerman\Connection\TcpConnection;
use Workerman\Http\Client;
use Workerman\Protocols\Http\Request;
use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker('http://0.0.0.0:2345');

// 保存訂閱主題->回調(diào)url的數(shù)組
global $subjects;
$subjects = [];

// 進(jìn)程啟動(dòng)時(shí)建立一個(gè)到A服務(wù)的連接
$worker->onWorkerStart = function() {
    $con = new AsyncTcpConnection('ws://echo.websocket.org:443');
    $con->transport = 'ssl'; // tcp協(xié)議不需要設(shè)置
    $con->onWebSocketConnect = function(AsyncTcpConnection $con, ) {
        $con->send('hello');
    };
    // 收到A服務(wù)的訂閱數(shù)據(jù)后轉(zhuǎn)發(fā)給訂閱者
    $con->onMessage = function(AsyncTcpConnection $con, $subject) {
        global $subjects;
        foreach ($subjects[$subject] ?? [] as $callbackUrl) {
            $httpClient = new Client();
            $httpClient->post($callbackUrl, ['subject' => $subject]);
            echo "callback $callbackUrl\n";
        }
    };
    // 為了模擬A主動(dòng)推送訂閱消息,這里定時(shí)向A服務(wù)發(fā)送數(shù)據(jù),A會(huì)返回hello
    \Workerman\Timer::add(10, function () use ($con) {
        $con->send('hello');
    });
    $con->connect();
};

$worker->onMessage = function (TcpConnection $httpConnection, Request $request) {
    $callbackUrl = $request->get('callback');
    // 訂閱主題
    $subject = $request->get('subject');
    // 沒有回調(diào)url或者訂閱主題認(rèn)為是同步指令
    if (!$callbackUrl || !$subject) {
        $con = new AsyncTcpConnection('ws://echo.websocket.org:443');
        $con->transport = 'ssl'; // tcp協(xié)議不需要設(shè)置
        $con->onWebSocketConnect = function (AsyncTcpConnection $con) {
            $con->send('hello');
        };
        $con->onMessage = function (AsyncTcpConnection $con, $data) use ($httpConnection) {
            $httpConnection->send($data);
            $con->close();
        };
        $con->connect();
        return;
    }
    // 記錄主題和回調(diào)之間的關(guān)系
    global $subjects;
    if (!isset($subjects[$subject])) {
        $subjects[$subject] = [];
    }
    if (in_array($callbackUrl, $subjects[$subject])) {
        $httpConnection->send('already subscribed');
        return;
    }
    $subjects[$subject][] = $callbackUrl;
    $httpConnection->send('subscribed');
};

Worker::runAll();

啟動(dòng)

php start.php start

測(cè)試

同步請(qǐng)求url類似 http://127.0.0.1:2345
訂閱測(cè)試url類似 http://127.0.0.1:2345/?subject=hello&callback=<url地址>

說明

為了方便測(cè)試,上面代碼用的wss協(xié)議測(cè)試,你需要改成自己的協(xié)議

  • xiaopi 2024-04-28

    感謝老大,思路太棒了,在worker進(jìn)程開啟的時(shí)候就建立一個(gè)tcp長(zhǎng)連接負(fù)責(zé)分發(fā)A服務(wù)的響應(yīng),然后http請(qǐng)求時(shí)使用短連接,查完就關(guān)閉連接。稍微改一下直接可以用了。接下來我ab壓測(cè)下單進(jìn)程的性能,以及是否可以多進(jìn)程,或者移植到webman上。
    按我現(xiàn)在想的,應(yīng)該可以直接開啟多進(jìn)程增大并發(fā),因?yàn)槊總€(gè)進(jìn)程開啟的時(shí)候就建立tcp客戶端連接,然后http客戶端發(fā)送訂閱指令時(shí),無論subject分發(fā)到哪個(gè)worker進(jìn)程去A服務(wù)上訂閱都無所謂。

    可能要改的是就是進(jìn)程開啟時(shí)的定時(shí)器,定時(shí)將存儲(chǔ)的訂閱指令發(fā)送到TCP服務(wù)端中
    類似下面代碼

    $worker->onWorkerStart = function() {
        $con = new AsyncTcpConnection('ws://echo.websocket.org:443');
        $con->transport = 'ssl'; // tcp協(xié)議不需要設(shè)置
        $con->onWebSocketConnect = function(AsyncTcpConnection $con, ) {
            $con->send('hello');
        };
        // 收到A服務(wù)的訂閱數(shù)據(jù)后轉(zhuǎn)發(fā)給訂閱者
        $con->onMessage = function(AsyncTcpConnection $con, $subject) {
            global $subjects;
            foreach ($subjects[$subject] ?? [] as $callbackUrl) {
                $httpClient = new Client();
                $httpClient->post($callbackUrl, ['subject' => $subject]);
                echo "callback $callbackUrl\n";
            }
        };
        //  定時(shí)獲取全局變量中待發(fā)送的訂閱指令,發(fā)送后在onMessage中監(jiān)聽訂閱響應(yīng)
        \Workerman\Timer::add(10, function () use ($con) {
            global $subjects;
            foreach ($subjects as $subject) {
                $con->send('訂閱指令');
            }
            if (empty($subjects)) {
                $con->send('發(fā)送心跳');
            }
        });
        $con->connect();
    };
walkor 打賞

多進(jìn)程時(shí)要加一句 $worker->reusePort = true;

$worker = new Worker('http://0.0.0.0:2345');
$worker->reusePort = true;
JackDx

學(xué)習(xí)了。

  • 暫無評(píng)論
年代過于久遠(yuǎn),無法發(fā)表回答
??