背景:
對(duì)方提供了個(gè)產(chǎn)品信息接口,只支持拉操作,限制QPS為20,產(chǎn)品規(guī)格和價(jià)格都可能實(shí)時(shí)變化,現(xiàn)在設(shè)計(jì)思路就是通過Timer把該接口的產(chǎn)品每隔5分鐘丟去隊(duì)列中,再通過隊(duì)列數(shù)量來進(jìn)行拉取。
問題:
隊(duì)列數(shù)量設(shè)置為10;curl中配合usleep通過記錄上次請(qǐng)求時(shí)間毫秒數(shù)來計(jì)算延遲請(qǐng)求,控制每次CURL相隔500毫秒。
但是對(duì)方監(jiān)控中發(fā)現(xiàn),實(shí)際請(qǐng)求QPS會(huì)達(dá)到30多或40,后來通過減少隊(duì)列數(shù)量到5,QPS才保持在20以內(nèi),
請(qǐng)問問各位吳彥祖對(duì)于這類限制QPS查詢頻率的有沒什么好的方法實(shí)現(xiàn)?
補(bǔ)充
感謝各位吳彥祖的回答,我現(xiàn)在實(shí)現(xiàn)方法類似:
Task.php
<?php
namespace process;
use support\Db;
use support\Redis;
use Webman\RedisQueue\Client;
use Workerman\Timer;
class Task
{
public function onWorkerStart()
{
Timer::add(30 * 60,function(){
$rs = Db::table('xxxxx')
->where(......)
->get();
foreach ($rs as $row){
/*防止上一輪任務(wù)沒跑完又重新添加進(jìn)來了
*/
$key = 'apiName_'.$row->key;
$isInQueue = Redis::get($key);
if($isInQueue) continue;
Redis::set($key,1);
Client::send('apiSync',['key'=>$row->key]);
}
});
}
}
Queue.php
<?php
namespace app\queue\redis;
use app\lib\helper;
use support\Redis;
use Webman\RedisQueue\Consumer;
class apiSync implements Consumer
{
public $queue = 'apiSync';
public $connection = 'default';
public function consume($data)
{
$key = $data['key'];
for($i = 0;$i<=15;$i++){
$checkInDate = date('Y-m-d',strtotime("+ ${i} day"));
helper::updatePrice($key);
}
Redis::del('apiName_'.$key);
return true;
}
}
helper:
<?php
namespace app\lib;
use Exception;
class apisync
{
const url = 'xxxxx';
private static $lstReqTime = 0;
public static function updatePrice($key){
return self::req('/xxxxxx',['key'=>$key],1000);
}
public static function req(string $uri,array $body=[],$qpsLimit=false): response
{
$reqTime = msectime();
if($qpsLimit !== false){
$lessTime = $reqTime - self::$lstReqTime*1;
if($lessTime < $qpsLimit){
usleep(($qpsLimit-$lessTime)*1000);
}
}
self::$lstReqTime = msectime();
$jsonStr = json_encode($body,JSON_UNESCAPED_UNICODE);
$ch = curl_init();
curl_setopt($ch, CURLOPT_POST, 1);
curl_setopt($ch, CURLOPT_URL, self::url . $uri);
curl_setopt($ch, CURLOPT_POSTFIELDS, $jsonStr);
curl_setopt($ch, CURLOPT_TIMEOUT, 5);
curl_setopt($ch, CURLOPT_ENCODING, 'gzip,deflate');
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_HTTPHEADER, array(
'Content-Type: application/json; charset=utf-8',
'Content-Length: ' . strlen($jsonStr)
)
);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
return new response($httpCode,$response);
}
}
實(shí)現(xiàn)過程:
通過Timer設(shè)定30分鐘,全量查一次需要拉取的產(chǎn)品,丟入redis隊(duì)列中,其中防止重復(fù)入列,所以增加了個(gè)判斷任務(wù)是否存在于隊(duì)列中。
隊(duì)列發(fā)起請(qǐng)求的時(shí)候,會(huì)計(jì)算本次請(qǐng)求毫秒數(shù) - 上次請(qǐng)求毫秒數(shù),如果大于1秒,則執(zhí)行查詢,否則通過usleep來延遲本次查詢。
但是現(xiàn)在實(shí)際出現(xiàn)情況是:
開5個(gè)redis消費(fèi),設(shè)定查詢間隔是1000毫秒,實(shí)際產(chǎn)生QPS會(huì)達(dá)到10~20,理論應(yīng)該QPS是3~8吧?畢竟usleep并不是那么精準(zhǔn)。
另外就是redis隊(duì)列確實(shí)會(huì)堆積,還有通過php start.php status -d 會(huì)看到隊(duì)列一堆busy,幾分鐘后status拋出錯(cuò)誤退出了。
也是用 redis,稍微調(diào)整下
$key = 'lock:queue';
while($r = \support\Redis::zIncrBy($key, 1, time())) {
if ($r > 20) {
sleep(1);
continue; // 請(qǐng)求未發(fā)出,繼續(xù)循環(huán)
}
// do something
break; // 請(qǐng)求已發(fā)出,結(jié)束循環(huán)
}