我有個需求,系統(tǒng)每天定時同步釘釘數(shù)據(jù),管理人員也可以手動同步釘釘數(shù)據(jù)。
每個學校同步數(shù)據(jù)根據(jù)學校規(guī)模耗時大概1-5分鐘不等,現(xiàn)有近150所學校需要進行數(shù)據(jù)同步,如果不做異步任務處理,進程阻塞,整個業(yè)務肯定會受影響,因此,我想是否可以將接收到的這些任務,交給新開的進程去做(不影響業(yè)務自身的進程數(shù),即新開的進程是在cpu_count() * 2這個數(shù)量以外的),后端接收到任務后,直接返回任務提交成功的數(shù)據(jù)。
我大致看了下自定義進程的內(nèi)容,不是太明白。請老大指點,具體做哪幾步?
邏輯大概就是: 開一個自定義進程,里面監(jiān)聽text 協(xié)議,然后里面接收到正確同步消息的時候,去同步數(shù)據(jù).
你api接口收到同步請求,接著發(fā)送同步請求消息到自定義進程里面,然后響應前端任務提交成功.
首先非常感謝,我按照官方和大家的思路,做了如下三步。
process配置如下:
'sync_task' => [
'handler' => process\SyncTask::class,
'listen' => 'text://0.0.0.0:8888',
'count' => 5,
'reusePort' => true
],
SyncTask:
public function onMessage(AsyncTcpConnection $connection, $data)
{
Cache::set('send', $data, false);
$connection->send($data.'1111');
}
public function onClose(AsyncTcpConnection $connection)
{
Cache::set('onClose', true, false);
}
controller:
$task_connection = new AsyncTcpConnection('text://127.0.0.1:8888');
$task_connection->send('aa');
$task_connection->onMessage = function (AsyncTcpConnection $connection, $result) use ($task_connection) {
Cache::set('task', $result);
};
$task_connection->connect();
return $this->success('ok');
以上不知道是否正確,前端請求直接返回成功,但是緩存中沒有存入任何數(shù)據(jù),是沒有執(zhí)行異步任務嗎?不知道這樣哪里錯了,請大佬們指點。
1 控制器代碼用下面函數(shù)通知自定義進程.
2 SyncTask 里面onMessage 里面send 后直接調(diào)用$connection->close();關閉鏈接.
3 執(zhí)行異步任務是在自定義進程 SyncTask 里面調(diào)用同步數(shù)據(jù)代碼,你上面沒看到有調(diào)用邏輯.
/**
* @param $port int 端口
* @param array $data 發(fā)送的數(shù)據(jù)
* @return mixed
* 發(fā)送text協(xié)議
*/
function sendTextSocket($port, $data = [])
{
// 建立socket連接到內(nèi)部推送端口
$client = stream_socket_client('tcp://127.0.0.1:' . $port, $errno, $errmsg, 1);
// 發(fā)送數(shù)據(jù),注意5678端口是Text協(xié)議的端口,Text協(xié)議需要在數(shù)據(jù)末尾加上換行符
fwrite($client, json_encode($data) . "\n");
// 讀取推送結果
$res = fread($client, 8192);
}