1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
<?php
/**
* @link http://www.yiiframework.com/
* @copyright Copyright (c) 2008 Yii Software LLC
* @license http://www.yiiframework.com/license/
*/
namespace yii\queue\gearman;
use yii\base\NotSupportedException;
use yii\queue\cli\Queue as CliQueue;
/**
* Gearman Queue.
*
* @author Roman Zhuravlev <zhuravljov@gmail.com>
*/
class Queue extends CliQueue
{
public $host = 'localhost';
public $port = 4730;
public $channel = 'queue';
/**
* @var string command class name
*/
public $commandClass = Command::class;
/**
* Listens queue and runs each job.
*
* @param bool $repeat whether to continue listening when queue is empty.
* @return null|int exit code.
* @internal for worker command only.
* @since 2.0.2
*/
public function run($repeat)
{
return $this->runWorker(function (callable $canContinue) use ($repeat) {
$worker = new \GearmanWorker();
$worker->addServer($this->host, $this->port);
$worker->addFunction($this->channel, function (\GearmanJob $payload) {
list($ttr, $message) = explode(';', $payload->workload(), 2);
$this->handleMessage($payload->handle(), $message, $ttr, 1);
});
$worker->setTimeout($repeat ? 1000 : 1);
while ($canContinue()) {
$result = $worker->work();
if (!$result && !$repeat) {
break;
}
}
});
}
/**
* @inheritdoc
*/
protected function pushMessage($message, $ttr, $delay, $priority)
{
if ($delay) {
throw new NotSupportedException('Delayed work is not supported in the driver.');
}
switch ($priority) {
case 'high':
return $this->getClient()->doHighBackground($this->channel, "$ttr;$message");
case 'low':
return $this->getClient()->doLowBackground($this->channel, "$ttr;$message");
default:
return $this->getClient()->doBackground($this->channel, "$ttr;$message");
}
}
/**
* @inheritdoc
*/
public function status($id)
{
$status = $this->getClient()->jobStatus($id);
if ($status[0] && !$status[1]) {
return self::STATUS_WAITING;
}
if ($status[0] && $status[1]) {
return self::STATUS_RESERVED;
}
return self::STATUS_DONE;
}
/**
* @return \GearmanClient
*/
protected function getClient()
{
if (!$this->_client) {
$this->_client = new \GearmanClient();
$this->_client->addServer($this->host, $this->port);
}
return $this->_client;
}
private $_client;
}