比如一個(gè)表里有個(gè)3,5萬(wàn)個(gè)數(shù)據(jù)遍歷投遞很慢
foreach ($users as $data) {
$queue = 'sms';
Redis::send($queue, $data);
}
這樣很慢有啥解決方案嗎
首先 workerman/redis-queue
隊(duì)列組件是利用 Redis
的 List
和 Sorted Set
實(shí)現(xiàn)的,因此在客戶端可以直接使用原生 Redis
的管道功能,文檔地址:在非workerman環(huán)境向隊(duì)列發(fā)送消息
自己?jiǎn)为?dú)封裝一個(gè)函數(shù)去用即可:
function multi_redis_queue_send($redis, $queue, $multiData, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting'; //1.0.5版本之前為redis-queue-waiting
$queue_delay = '{redis-queue}-delayed';//1.0.5版本之前為redis-queue-delayed
// 創(chuàng)建管道
$pipe = $redis->multi(Redis::PIPELINE);
foreach ($multiData as $data) {
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => 0,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
$pipe->zAdd($queue_delay, $now + $delay, $package_str);
} else {
$pipe->lPush($queue_waiting.$queue, $package_str);
}
}
// 執(zhí)行管道中的命令
$results = $pipe->exec();
// 需要注意的是,管道操作并不是原子操作,雖然管道中的所有命令一起執(zhí)行,但是每個(gè)命令的執(zhí)行結(jié)果仍然會(huì)返回給客戶端。因此,在使用管道操作時(shí),需要注意處理每個(gè)命令的執(zhí)行結(jié)果。
// 處理每個(gè)命令的執(zhí)行結(jié)果
foreach ($results as $result) {
// ...
}
}
在處理每個(gè)命令的執(zhí)行結(jié)果時(shí),需要注意以下幾點(diǎn):
如果命令執(zhí)行成功,對(duì)應(yīng)的執(zhí)行結(jié)果是一個(gè)非負(fù)整數(shù),表示執(zhí)行成功的命令數(shù)量。
如果命令執(zhí)行失敗,對(duì)應(yīng)的執(zhí)行結(jié)果是一個(gè)RedisException異常對(duì)象,需要對(duì)異常進(jìn)行處理。
在使用EXEC命令執(zhí)行管道操作之前,不能執(zhí)行其他Redis命令,否則會(huì)中斷管道操作,導(dǎo)致管道中的命令沒(méi)有被執(zhí)行。
最后的解決方案分享一下還在測(cè)試呢,不知道還又什么問(wèn)題,大佬們看下
function multi_redis_queue_send($queue, $multiData, $delay = 0) {
Redis::pipeline(function ($pipe) use ($queue, $multiData, $delay) {
$queue_waiting = '{redis-queue}-waiting'; //1.0.5版本之前為redis-queue-waiting
$queue_delay = '{redis-queue}-delayed';//1.0.5版本之前為redis-queue-delayed
foreach ($multiData as $data) {
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => 0,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
$pipe->zAdd($queue_delay, $now + $delay, $package_str);
} else {
$pipe->lPush($queue_waiting.$queue, $package_str);
}
}
});
}