*/ class Queue extends CliQueue { /** * @var string connection host */ public $host = 'localhost'; /** * @var int connection port */ public $port = PheanstalkInterface::DEFAULT_PORT; /** * @var string beanstalk tube */ public $tube = 'queue'; /** * @var string command class name */ public $commandClass = Command::class; /** * Listens queue and runs each job. * * @param bool $repeat whether to continue listening when queue is empty. * @param int $timeout number of seconds to wait for next message. * @return null|int exit code. * @internal for worker command only. * @since 2.0.2 */ public function run($repeat, $timeout = 0) { return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) { while ($canContinue()) { if ($payload = $this->getPheanstalk()->reserveFromTube($this->tube, $timeout)) { $info = $this->getPheanstalk()->statsJob($payload); if ($this->handleMessage( $payload->getId(), $payload->getData(), $info->ttr, $info->reserves )) { $this->getPheanstalk()->delete($payload); } } elseif (!$repeat) { break; } } }); } /** * @inheritdoc */ public function status($id) { if (!is_numeric($id) || $id <= 0) { throw new InvalidArgumentException("Unknown message ID: $id."); } try { $stats = $this->getPheanstalk()->statsJob($id); if ($stats['state'] === 'reserved') { return self::STATUS_RESERVED; } return self::STATUS_WAITING; } catch (ServerException $e) { if ($e->getMessage() === 'Server reported NOT_FOUND') { return self::STATUS_DONE; } throw $e; } } /** * Removes a job by ID. * * @param int $id of a job * @return bool * @since 2.0.1 */ public function remove($id) { try { $job = $this->getPheanstalk()->peek($id); $this->getPheanstalk()->delete($job); return true; } catch (ServerException $e) { if (strpos($e->getMessage(), 'NOT_FOUND') === 0) { return false; } throw $e; } } /** * @inheritdoc */ protected function pushMessage($message, $ttr, $delay, $priority) { return $this->getPheanstalk()->putInTube( $this->tube, $message, $priority ?: PheanstalkInterface::DEFAULT_PRIORITY, $delay, $ttr ); } /** * @return object tube statistics */ public function getStatsTube() { return $this->getPheanstalk()->statsTube($this->tube); } /** * @return Pheanstalk */ protected function getPheanstalk() { if (!$this->_pheanstalk) { $this->_pheanstalk = new Pheanstalk($this->host, $this->port); } return $this->_pheanstalk; } private $_pheanstalk; }