国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

我想把rabbitmq 數(shù)據(jù) 轉(zhuǎn)發(fā)到worker上面 一直不行 一直在 consume() 方法執(zhí)行不往下執(zhí)行

zhou2021
require_once __DIR__ . '/../../vendor/autoload.php';
require_once __DIR__ . '/../common/common.php';
global $location_con;

$worker = new Worker();

$worker->count = 1;

$worker->onWorkerStart = function () {

    dg_init('system', 'rabbitmq_data_sysnc', array());
    global $location_con;
    global $sting;
    $string = '';
    //建立連接
    $location_con = new AsyncTcpConnection('ws://127.0.0.1:9395');
    $location_con->onConnect = function ($connection) {
        var_dump('onConnect ok');
        //heartbeat($connection, 'rabbitmq_data', 0);
    };
    $location_con->connect();
    $location_con->send('123123123123');
    $conn_args = array(
        'host' => '192.168.5.133',
        'port' => '5672',
        'login' => 'admin',
        'password' => 'admin',
        'vhost' => '/'
    );
    $e_name = 'S2C'; //交換機(jī)名
    $q_name = 'S2C'; //隊列名
    $k_route = 'S2C'; //路由key
    //創(chuàng)建連接和channel
    $conn = new AMQPConnection($conn_args);
    if (!$conn->connect()) {
        die("Cannot connect to the broker!\n");
    }
    $channel = new AMQPChannel($conn);

    //創(chuàng)建交換機(jī)
    $ex = new AMQPExchange($channel);
    $ex->setName($e_name);
    $ex->setType(AMQP_EX_TYPE_FANOUT); //direct類型
    $ex->setFlags(AMQP_PASSIVE); //持久化
    $ex->declareExchange();

    //創(chuàng)建隊列
    $q = new AMQPQueue($channel);
    $q->setName($q_name);
    $q->setFlags(AMQP_DURABLE); //持久化
    $q->declareQueue();
    $q->bind($e_name, $k_route);//綁定交換機(jī)與隊列,并指定路由鍵

********下面這段代碼不行

    $q->consume(function($envelope, $queue) use($location_con){
        echo "send data to 9037";
        $location_con->send($envelope->getBody());
        echo $string = $envelope->getBody();
    });//自動ACK應(yīng)答
*************上面代碼不知道怎么寫
    $conn->disconnect();
    //連接斷開
    $location_con->onClose = function ($connection) {
        connect_close($connection, 'start_data_sync-walkthink');
    };
};

Worker::runAll();
2830 2 0
2個回答

walkor 打賞

consume里是一個死循環(huán),一直循環(huán)消費(fèi)隊列的數(shù)據(jù)。因為代碼一直運(yùn)行在這個循環(huán)里,workerman永遠(yuǎn)無法得到控制權(quán),就無法把數(shù)據(jù)發(fā)送出去。

你可以用stream_socket_client 替代 AsyncTcpConnection ??吹綄Χ耸褂玫膚ebsocket協(xié)議,stream_socket_client 不好直接連websocket端口,對端最好再開一個text端口,stream_socket_client以text協(xié)議發(fā)送數(shù)據(jù)。

  • zhou2021 2021-03-29

    在worker 里面 stream_socket_client 怎么使用呢 有實(shí)例 嗎?或者 代碼?多謝

walkor 打賞

接收方開text協(xié)議,類似

$worker = new Worker('text://127.0.0.1:9395');
$worker->onMessage = function($con, $data){
    $data = json_decode($data);
    var_dump($data);
};
$q->consume(function($envelope, $queue) use($location_con){
  $client = stream_socket_client('tcp://127.0.0.1:9395');
  fwrite($client, json_encode($envelope->getBody())."\n");
  echo $string = $envelope->getBody();
});
  • zhou2021 2021-03-29

    // 建立socket連接到內(nèi)部推送端口
    $client = stream_socket_client('tcp://127.0.0.1:9395', $errno, $errmsg, 1);

    $conn_args = array(
    'host' => '192.168.5.133',
    'port' => '5672',
    'login' => 'admin',
    'password' => 'admin',
    'vhost' => '/',
    'debug' =>true
    );
    $e_name = 'S2C'; //交換機(jī)名
    $q_name = 'S2C'; //隊列名
    $k_route = 'S2C'; //路由key
    //創(chuàng)建連接和channel
    $conn = new AMQPConnection($conn_args);
    if (!$conn->connect()) {
    //die("Cannot connect to the broker!\n");
    }
    $channel = new AMQPChannel($conn);

    //創(chuàng)建交換機(jī)
    $ex = new AMQPExchange($channel);
    $ex->setName($e_name);
    $ex->setType(AMQP_EX_TYPE_FANOUT); //direct類型
    $ex->setFlags(AMQP_PASSIVE); //持久化
    $ex->declareExchange();

    //創(chuàng)建隊列
    $q = new AMQPQueue($channel);
    $q->setName($q_name);
    $q->setFlags(AMQP_DURABLE); //持久化
    $q->declareQueue();
    $q->bind($e_name, $k_route);//綁定交換機(jī)與隊列,并指定路由鍵

    while(True){
    $q->consume(function($envelope, $queue) use($client){
    echo "send data to 9037";
    fwrite($client, $envelope->getBody());
    },AMQP_AUTOACK);//自動ACK應(yīng)答
    }
    //$conn->disconnect();

年代過于久遠(yuǎn),無法發(fā)表回答
??