1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
<?php
namespace Codeception\Lib\Driver;
use Codeception\Exception\TestRuntimeException;
use Codeception\Lib\Interfaces\Queue;
use Aws\Sqs\SqsClient;
use Aws\Credentials\Credentials;
class AmazonSQS implements Queue
{
protected $queue;
/**
* Connect to the queueing server. (AWS, Iron.io and Beanstalkd)
* @param array $config
* @return
*/
public function openConnection($config)
{
$params = [
'region' => $config['region'],
];
if (! empty($config['key']) && ! empty($config['secret'])) {
$params['credentials'] = new Credentials($config['key'], $config['secret']);
}
if (! empty($config['profile'])) {
$params['profile'] = $config['profile'];
}
if (! empty($config['version'])) {
$params['version'] = $config['version'];
}
if (! empty($config['endpoint'])) {
$params['endpoint'] = $config['endpoint'];
}
$this->queue = new SqsClient($params);
if (!$this->queue) {
throw new TestRuntimeException('connection failed or timed-out.');
}
}
/**
* Post/Put a message on to the queue server
*
* @param string $message Message Body to be send
* @param string $queue Queue Name
*/
public function addMessageToQueue($message, $queue)
{
$this->queue->sendMessage([
'QueueUrl' => $this->getQueueURL($queue),
'MessageBody' => $message,
]);
}
/**
* Return a list of queues/tubes on the queueing server
*
* @return array Array of Queues
*/
public function getQueues()
{
$queueNames = [];
$queues = $this->queue->listQueues(['QueueNamePrefix' => ''])->get('QueueUrls');
foreach ($queues as $queue) {
$tokens = explode('/', $queue);
$queueNames[] = $tokens[sizeof($tokens) - 1];
}
return $queueNames;
}
/**
* Count the current number of messages on the queue.
*
* @param $queue Queue Name
*
* @return int Count
*/
public function getMessagesCurrentCountOnQueue($queue)
{
return $this->queue->getQueueAttributes([
'QueueUrl' => $this->getQueueURL($queue),
'AttributeNames' => ['ApproximateNumberOfMessages'],
])->get('Attributes')['ApproximateNumberOfMessages'];
}
/**
* Count the total number of messages on the queue.
*
* @param $queue Queue Name
*
* @return int Count
*/
public function getMessagesTotalCountOnQueue($queue)
{
return $this->queue->getQueueAttributes([
'QueueUrl' => $this->getQueueURL($queue),
'AttributeNames' => ['ApproximateNumberOfMessages'],
])->get('Attributes')['ApproximateNumberOfMessages'];
}
public function clearQueue($queue)
{
$queueURL = $this->getQueueURL($queue);
while (true) {
$res = $this->queue->receiveMessage(['QueueUrl' => $queueURL]);
if (!$res->getPath('Messages')) {
return;
}
foreach ($res->getPath('Messages') as $msg) {
$this->queue->deleteMessage([
'QueueUrl' => $queueURL,
'ReceiptHandle' => $msg['ReceiptHandle']
]);
}
}
}
/**
* Get the queue/tube URL from the queue name (AWS function only)
*
* @param $queue Queue Name
*
* @return string Queue URL
*/
private function getQueueURL($queue)
{
$queues = $this->queue->listQueues(['QueueNamePrefix' => ''])->get('QueueUrls');
foreach ($queues as $queueURL) {
$tokens = explode('/', $queueURL);
if (strtolower($queue) == strtolower($tokens[sizeof($tokens) - 1])) {
return $queueURL;
}
}
throw new TestRuntimeException('queue [' . $queue . '] not found');
}
public function getRequiredConfig()
{
return ['region'];
}
public function getDefaultConfig()
{
return [];
}
}