1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
<?php
/**
* @link http://www.yiiframework.com/
* @copyright Copyright (c) 2008 Yii Software LLC
* @license http://www.yiiframework.com/license/
*/
namespace yii\queue\beanstalk;
use Pheanstalk\Exception\ServerException;
use Pheanstalk\Pheanstalk;
use Pheanstalk\PheanstalkInterface;
use yii\base\InvalidArgumentException;
use yii\queue\cli\Queue as CliQueue;
/**
* Beanstalk Queue.
*
* @author Roman Zhuravlev <zhuravljov@gmail.com>
*/
class Queue extends CliQueue
{
/**
* @var string connection host
*/
public $host = 'localhost';
/**
* @var int connection port
*/
public $port = PheanstalkInterface::DEFAULT_PORT;
/**
* @var string beanstalk tube
*/
public $tube = 'queue';
/**
* @var string command class name
*/
public $commandClass = Command::class;
/**
* Listens queue and runs each job.
*
* @param bool $repeat whether to continue listening when queue is empty.
* @param int $timeout number of seconds to wait for next message.
* @return null|int exit code.
* @internal for worker command only.
* @since 2.0.2
*/
public function run($repeat, $timeout = 0)
{
return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) {
while ($canContinue()) {
if ($payload = $this->getPheanstalk()->reserveFromTube($this->tube, $timeout)) {
$info = $this->getPheanstalk()->statsJob($payload);
if ($this->handleMessage(
$payload->getId(),
$payload->getData(),
$info->ttr,
$info->reserves
)) {
$this->getPheanstalk()->delete($payload);
}
} elseif (!$repeat) {
break;
}
}
});
}
/**
* @inheritdoc
*/
public function status($id)
{
if (!is_numeric($id) || $id <= 0) {
throw new InvalidArgumentException("Unknown message ID: $id.");
}
try {
$stats = $this->getPheanstalk()->statsJob($id);
if ($stats['state'] === 'reserved') {
return self::STATUS_RESERVED;
}
return self::STATUS_WAITING;
} catch (ServerException $e) {
if ($e->getMessage() === 'Server reported NOT_FOUND') {
return self::STATUS_DONE;
}
throw $e;
}
}
/**
* Removes a job by ID.
*
* @param int $id of a job
* @return bool
* @since 2.0.1
*/
public function remove($id)
{
try {
$job = $this->getPheanstalk()->peek($id);
$this->getPheanstalk()->delete($job);
return true;
} catch (ServerException $e) {
if (strpos($e->getMessage(), 'NOT_FOUND') === 0) {
return false;
}
throw $e;
}
}
/**
* @inheritdoc
*/
protected function pushMessage($message, $ttr, $delay, $priority)
{
return $this->getPheanstalk()->putInTube(
$this->tube,
$message,
$priority ?: PheanstalkInterface::DEFAULT_PRIORITY,
$delay,
$ttr
);
}
/**
* @return object tube statistics
*/
public function getStatsTube()
{
return $this->getPheanstalk()->statsTube($this->tube);
}
/**
* @return Pheanstalk
*/
protected function getPheanstalk()
{
if (!$this->_pheanstalk) {
$this->_pheanstalk = new Pheanstalk($this->host, $this->port);
}
return $this->_pheanstalk;
}
private $_pheanstalk;
}