require_once __DIR__ . '/../../vendor/autoload.php';
require_once __DIR__ . '/../common/common.php';
global $location_con;
$worker = new Worker();
$worker->count = 1;
$worker->onWorkerStart = function () {
dg_init('system', 'rabbitmq_data_sysnc', array());
global $location_con;
global $sting;
$string = '';
//建立連接
$location_con = new AsyncTcpConnection('ws://127.0.0.1:9395');
$location_con->onConnect = function ($connection) {
var_dump('onConnect ok');
//heartbeat($connection, 'rabbitmq_data', 0);
};
$location_con->connect();
$location_con->send('123123123123');
$conn_args = array(
'host' => '192.168.5.133',
'port' => '5672',
'login' => 'admin',
'password' => 'admin',
'vhost' => '/'
);
$e_name = 'S2C'; //交換機(jī)名
$q_name = 'S2C'; //隊列名
$k_route = 'S2C'; //路由key
//創(chuàng)建連接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);
//創(chuàng)建交換機(jī)
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_FANOUT); //direct類型
$ex->setFlags(AMQP_PASSIVE); //持久化
$ex->declareExchange();
//創(chuàng)建隊列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
$q->declareQueue();
$q->bind($e_name, $k_route);//綁定交換機(jī)與隊列,并指定路由鍵
********下面這段代碼不行
$q->consume(function($envelope, $queue) use($location_con){
echo "send data to 9037";
$location_con->send($envelope->getBody());
echo $string = $envelope->getBody();
});//自動ACK應(yīng)答
*************上面代碼不知道怎么寫
$conn->disconnect();
//連接斷開
$location_con->onClose = function ($connection) {
connect_close($connection, 'start_data_sync-walkthink');
};
};
Worker::runAll();
consume里是一個死循環(huán),一直循環(huán)消費(fèi)隊列的數(shù)據(jù)。因為代碼一直運(yùn)行在這個循環(huán)里,workerman永遠(yuǎn)無法得到控制權(quán),就無法把數(shù)據(jù)發(fā)送出去。
你可以用stream_socket_client 替代 AsyncTcpConnection ??吹綄Χ耸褂玫膚ebsocket協(xié)議,stream_socket_client 不好直接連websocket端口,對端最好再開一個text端口,stream_socket_client以text協(xié)議發(fā)送數(shù)據(jù)。
接收方開text協(xié)議,類似
$worker = new Worker('text://127.0.0.1:9395');
$worker->onMessage = function($con, $data){
$data = json_decode($data);
var_dump($data);
};
$q->consume(function($envelope, $queue) use($location_con){
$client = stream_socket_client('tcp://127.0.0.1:9395');
fwrite($client, json_encode($envelope->getBody())."\n");
echo $string = $envelope->getBody();
});
// 建立socket連接到內(nèi)部推送端口
$client = stream_socket_client('tcp://127.0.0.1:9395', $errno, $errmsg, 1);
$conn_args = array(
'host' => '192.168.5.133',
'port' => '5672',
'login' => 'admin',
'password' => 'admin',
'vhost' => '/',
'debug' =>true
);
$e_name = 'S2C'; //交換機(jī)名
$q_name = 'S2C'; //隊列名
$k_route = 'S2C'; //路由key
//創(chuàng)建連接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
//die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);
//創(chuàng)建交換機(jī)
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_FANOUT); //direct類型
$ex->setFlags(AMQP_PASSIVE); //持久化
$ex->declareExchange();
//創(chuàng)建隊列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
$q->declareQueue();
$q->bind($e_name, $k_route);//綁定交換機(jī)與隊列,并指定路由鍵
while(True){
$q->consume(function($envelope, $queue) use($client){
echo "send data to 9037";
fwrite($client, $envelope->getBody());
},AMQP_AUTOACK);//自動ACK應(yīng)答
}
//$conn->disconnect();