項目需要實現延遲執(zhí)行,發(fā)布消息后延遲 5~10 秒執(zhí)行,
使用的 workerman/rabbitmq擴展,不要說哪個擴展能實現,已經用workerman/rabbitmq擴展寫了很多多列了,想再換擴展有點費勁
$client = Client::factory([
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
'heartbeat' => 60,
])->connect();
$channel = $client->channel();
$exchange_name = 'delayed_demo_exchange';
$queue_name = 'demo_delay_queue';
// 創(chuàng)建延遲交換機,交換機類型為 direct
$channel->exchangeDeclare($exchange_name, 'x-delayed-message', false, true, false, false, [
'x-delayed-type' => 'direct',
]);
// 創(chuàng)建延遲隊列
$queueName = 'delayed_queue';
$channel->queueDeclare($queue_name, false, true, false, false);
$channel->queueBind($queue_name, $exchange_name);
// 發(fā)送消息
$message = 'Delayed Message at ' . time();
$channel->publish($message, ['delivery-mode' => 2,'x-delay' => 5000], '', $queue_name);
// 將消息發(fā)布到延遲交換機
$channel->publish($msg, $exchangeName, $queueName);
Bunny\Exception\ClientException: PRECONDITION_FAILED - Invalid argument, 'x-delayed-type' must be an existing exchange
type in /Users/winds/Project/PHP/b_new/vendor/bunny/bunny/src/Bunny/ClientMethods.php:1280<br />
Stack trace:<br />
#0 /Users/winds/Project/PHP/b_new/vendor/bunny/bunny/src/Bunny/ClientMethods.php(1240): Bunny\AbstractClient->awaitQueueDeclareOk(1)<br />
#1 /Users/winds/Project/PHP/b_new/vendor/bunny/bunny/src/Bunny/ChannelMethods.php(111): Bunny\AbstractClient->queueDeclare(1, 'demo_delay_queu...', false, true, false, false, false, Array)<br />
#2 /Users/winds/Project/PHP/b_new/app/comm/service/pub/PubMq.php(506): Bunny\Channel->queueDeclare('demo_delay_queu...', false, true, false, false)<br />
#3 /Users/winds/Project/PHP/b_new/app/api/controller/CommonController.php(27): app\comm\service\pub\PubMq::demoPub(123456)<br />
#4 /Users/winds/Project/PHP/b_new/vendor/workerman/webman-framework/src/App.php(335): app\api\controller\CommonController->login(Object(support\Request))<br />
#5 /Users/winds/Project/PHP/b_new/vendor/workerman/webman-framework/src/App.php(358): Webman\App::Webman\{closure}(Object(support\Request))<br />
#6 /Users/winds/Project/PHP/b_new/app/middleware/GlobalMiddleware.php(15): Webman\App::Webman\{closure}(Object(support\Request))<br />
#7 /Users/winds/Project/PHP/b_new/vendor/workerman/webman-framework/src/App.php(351): app\middleware\GlobalMiddleware->process(Object(support\Request), Object(Closure))<br />
#8 /Users/winds/Project/PHP/b_new/vendor/webman/log/src/Middleware.php(96): Webman\App::Webman\{closure}(Object(support\Request))<br />
#9 /Users/winds/Project/PHP/b_new/vendor/workerman/webman-framework/src/App.php(351): Webman\Log\Middleware->process(Object(support\Request), Object(Closure))<br />
#10 /Users/winds/Project/PHP/b_new/vendor/workerman/webman-framework/src/App.php(662): Webman\App::Webman\{closure}(Object(support\Request))<br />
#11 /Users/winds/Project/PHP/b_new/vendor/workerman/webman-framework/src/App.php(156): Webman\App::findRoute(Object(Workerman\Connection\TcpConnection), '/api/v1/Common/...', 'POST/api/v1/Com...', Object(support\Request), 200)<br />
#12 /Users/winds/Project/PHP/b_new/vendor/workerman/workerman/src/Connection/TcpConnection.php(749): Webman\App->onMessage(Object(Workerman\Connection\TcpConnection), Object(support\Request))<br />
#13 /Users/winds/Project/PHP/b_new/vendor/workerman/workerman/src/Events/Select.php(400): Workerman\Connection\TcpConnection->baseRead(Resource id #349)<br />
#14 /Users/winds/Project/PHP/b_new/vendor/workerman/workerman/src/Worker.php(1735): Workerman\Events\Select->run()<br />
#15 /Users/winds/Project/PHP/b_new/vendor/workerman/workerman/src/Worker.php(1537): Workerman\Worker::forkOneWorkerForLinux(Object(Workerman\Worker))<br />
#16 /Users/winds/Project/PHP/b_new/vendor/workerman/workerman/src/Worker.php(1517): Workerman\Worker::forkWorkersForLinux()<br />
#17 /Users/winds/Project/PHP/b_new/vendor/workerman/workerman/src/Worker.php(585): Workerman\Worker::forkWorkers()<br />
#18 /Users/winds/Project/PHP/b_new/vendor/workerman/webman-framework/src/support/App.php(152): Workerman\Worker::runAll()<br />
#19 /Users/winds/Project/PHP/b_new/start.php(5): support\App::run()<br />
#20 {main}
系統(tǒng):macos 15.2
webman版本:1.6.8
workerman/rabbitmq版本:2.0.0
發(fā)布消息時,未指定 exchange
<?php
declare(strict_types=1);
namespace app\command;
use Bunny\Channel;
use Bunny\Message;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Output\OutputInterface;
use Workerman\Events\Fiber;
use Workerman\Events\Revolt;
use Workerman\RabbitMQ\Client;
use Workerman\Worker;
class DelayReceive extends Command
{
protected static $defaultName = 'delayReceive';
protected static $defaultDescription = 'delayReceive';
/**
* @return void
*/
protected function configure()
{
$this->addArgument('name', InputArgument::OPTIONAL, 'Name description');
}
/**
* @param InputInterface $input
* @param OutputInterface $output
* @return int
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$worker = new Worker();
$worker->eventLoop = Revolt::class;
$worker->onWorkerStart = function () {
$exchange_name = 'delayed_demo_exchange';
$queue_name = 'demo_delay_queue';
$routing_key = 'delayed_key';
$client = Client::factory([
'host' => 'rabbitmq',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
'heartbeat' => 60,
])->connect();
$channel = $client->channel();
// 創(chuàng)建延遲隊列
$channel->queueDeclare($queue_name, false, true, false, false, false, [
[
'x-delayed-type' => 'direct',
]
]);
// Consumer
$channel->consume(
function (Message $message, Channel $channel, \Bunny\AbstractClient $client) {
echo " [>] Received ", $message->content, "Now:",date("Y-m-d H:i:s"), "\n";
},
$queue_name,
$routing_key,
false,
true,
false,
false,
[
'x-delayed-type' => 'direct',
]
);
$client->run();
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
};
Worker::runAll();
return self::SUCCESS;
}
}
<?php
namespace app\command;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Output\OutputInterface;
use Workerman\RabbitMQ\Client;
class Delay extends Command
{
protected static $defaultName = 'delay';
protected static $defaultDescription = 'delay';
/**
* @return void
*/
protected function configure()
{
// $this->addArgument('name', InputArgument::OPTIONAL, 'Name description');
}
/**
* @param InputInterface $input
* @param OutputInterface $output
* @return int
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
// $name = $input->getArgument('name');
// $output->writeln('Hello delay');
$client = Client::factory([
'host' => 'rabbitmq',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
'heartbeat' => 60,
])->connect();
$channel = $client->channel();
$exchange_name = 'delayed_demo_exchange';
$queue_name = 'demo_delay_queue';
$routing_key = 'delayed_key';
// 創(chuàng)建延遲交換機,交換機類型為 direct
$channel->exchangeDeclare(
$exchange_name,
'x-delayed-message',
false,
true,
false,
false,
true,
[
'x-delayed-type' => 'direct',
]
);
// 創(chuàng)建延遲隊列
$channel->queueDeclare($queue_name, false, true, false, false);
$channel->queueBind($queue_name, $exchange_name, $routing_key);
while (true) {
// 發(fā)送消息
$message = 'Delayed Message at ' . time();
$channel->publish($message, ['delivery-mode' => 2,'x-delay' => 5000], $exchange_name, $routing_key);
sleep(1); // 1s 循環(huán)一次
}
return self::SUCCESS;
}
}