workerman/http-client 增加任務(wù)處理進(jìn)度及全部處理結(jié)束后回調(diào)方法
<?php
use \Workerman\Worker;
class waitGroup {
public bool $isWorkering = false;
public int $workerCounts = 0;
public int $workerOkCounts = 0;
public $func = null;
private int $startTime = 0;
public function __construct() {
$this->isWorkering = true;
}
/**
* 增加任務(wù)計(jì)數(shù)
* @param int $counts 任務(wù)數(shù)量
*/
public function Add(int $counts = 1) {
if (!$this->startTime) {
$this->startTime = microtime(true);
}
$this->workerCounts += $counts;
}
/**
* 子任務(wù)完成
* @param object $func 回調(diào)函數(shù)
* @field int $workerOkCounts 已完成的任務(wù)數(shù)
* @field int workerCounts 任務(wù)總數(shù)
*/
public function Done($func = null) {
$this->workerOkCounts += 1;
if ($func) {
call_user_func($func, $this->workerOkCounts, $this->workerCounts);
}
if ($this->isWorkering && $this->func && $this->workerOkCounts >= $this->workerCounts) {
$this->complete();
}
}
/**
* 任務(wù)執(zhí)行完成后回調(diào)
* @param object $func 回調(diào)函數(shù)
*/
public function Wait($func = null) {
$this->func = $func;
}
/**
* complete 完成操作后回調(diào)
* @field workerCounts int 任務(wù)總數(shù)
* @field workerTimer int 任務(wù)執(zhí)行總時(shí)間,單位:秒
*/
private function complete() {
call_user_func($this->func, $this->workerCounts, (microtime(true) - $this->startTime));
return true;
}
}
$worker = new Worker();
$worker->count = 1;
$waitGroup = new stdClass;
$worker->onWorkerStart = function () {
$options = [
'max_conn_per_addr' => 30, // 每個(gè)地址最多維持多少并發(fā)連接
'keepalive_timeout' => 15, // 連接多長(zhǎng)時(shí)間不通訊就關(guān)閉
'connect_timeout' => 30, // 連接超時(shí)時(shí)間
'timeout' => 30, // 等待響應(yīng)的超時(shí)時(shí)間
];
$http = new Workerman\Http\Client($options);
$urlArr = [];
for ($i = 1; $i <= 100; $i += 1) {
$urlArr[] = 'http://xxxxx.com/test?i=' . $i;
}
$waitGroup = new waitGroup();
$waitGroup->Add(count($urlArr));
$waitGroup->Wait(function ($workerCounts, $workerTimer) {
echo "任務(wù)已完成:" ,$workerCounts , ",耗時(shí):", sprintf("%0.3f", $workerTimer) , "秒\n";
});
foreach ($urlArr as $url) {
$http->get($url, function ($response) use ($waitGroup) {
var_dump(['code' => $response->getStatusCode(), 'body' => $response->getBody()->__toString()]);
$waitGroup->Done(function ($workerOkCounts, $workerCounts) {
echo "任務(wù)進(jìn)度:[ok] " , $workerOkCounts , "/" , $workerCounts,"\n";
});
}, function ($exception) use ($waitGroup) {
$waitGroup->Done(function ($workerOkCounts, $workerCounts) {
echo "任務(wù)進(jìn)度:[error] " , $workerOkCounts , "/" , $workerCounts,"\n";
});
echo $exception;
});
}
};
個(gè)評(píng)論
年代過(guò)于久遠(yuǎn),無(wú)法發(fā)表評(píng)論