*/ class Queue extends CliQueue { public $host = 'localhost'; public $port = 5672; public $user = 'guest'; public $password = 'guest'; public $queueName = 'queue'; public $exchangeName = 'exchange'; public $vhost = '/'; /** * @var string command class name */ public $commandClass = Command::class; /** * @var AMQPStreamConnection */ protected $connection; /** * @var AMQPChannel */ protected $channel; /** * @inheritdoc */ public function init() { parent::init(); Event::on(BaseApp::class, BaseApp::EVENT_AFTER_REQUEST, function () { $this->close(); }); } /** * Listens amqp-queue and runs new jobs. */ public function listen() { $this->open(); $callback = function (AMQPMessage $payload) { $id = $payload->get('message_id'); list($ttr, $message) = explode(';', $payload->body, 2); if ($this->handleMessage($id, $message, $ttr, 1)) { $payload->delivery_info['channel']->basic_ack($payload->delivery_info['delivery_tag']); } }; $this->channel->basic_qos(null, 1, null); $this->channel->basic_consume($this->queueName, '', false, false, false, false, $callback); while (count($this->channel->callbacks)) { $this->channel->wait(); } } /** * @inheritdoc */ protected function pushMessage($message, $ttr, $delay, $priority) { if ($delay) { throw new NotSupportedException('Delayed work is not supported in the driver.'); } if ($priority !== null) { throw new NotSupportedException('Job priority is not supported in the driver.'); } $this->open(); $id = uniqid('', true); $this->channel->basic_publish( new AMQPMessage("$ttr;$message", [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 'message_id' => $id, ]), $this->exchangeName ); return $id; } /** * @inheritdoc */ public function status($id) { throw new NotSupportedException('Status is not supported in the driver.'); } /** * Opens connection and channel. */ protected function open() { if ($this->channel) { return; } $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, $this->vhost); $this->channel = $this->connection->channel(); $this->channel->queue_declare($this->queueName, false, true, false, false); $this->channel->exchange_declare($this->exchangeName, 'direct', false, true, false); $this->channel->queue_bind($this->queueName, $this->exchangeName); } /** * Closes connection and channel. */ protected function close() { if (!$this->channel) { return; } $this->channel->close(); $this->connection->close(); } }