1 : <?php
2 :
3 1 : if (!class_exists('GearmanClient') || defined('TEST')) {
4 1 : require_once 'Net/Gearman/Client.php';
5 1 : }
6 :
7 : /**
8 : * Ibuildings Gearman Magento Model Observer
9 : *
10 : * @copyright (c) 2010 Ibuildings UK Ltd.
11 : * @author Michael Davey
12 : * @version 0.1.0
13 : * @package Ibuildings
14 : * @subpackage Gearman
15 : * @license https://github.com/ibuildings/Magento-Gearman-Module/blob/master/LICENCE
16 : */
17 : class Ibuildings_Gearman_Model_Queue extends Mage_Core_Model_Abstract
18 1 : {
19 : /**
20 : * Reference to the queue object for sending jobs and fetching status
21 : * @var GearmanClient|Net_Gearman_Client
22 : */
23 : private $_client;
24 :
25 : /**
26 : * Just calls setGearmanClient() for default instantiation
27 : *
28 : * @see setGearmanClient()
29 : */
30 : public function __construct()
31 : {
32 7 : $this->setGearmanClient();
33 7 : }
34 :
35 : /**
36 : * Sets the Gearman object according to config options
37 : *
38 : * Reads the server details from config and creates the GearmanClient
39 : * object and connects to the queue server. There is an optional 'type'
40 : * key, which can be used to force the type of client used. This was
41 : * introduced for testing purposes and should be used with caution as
42 : * Net_Gearman & Gearman extension queues cannot be used interchangeably.
43 : *
44 : * Also, I have not seen this documented anywhere, but it seems that the
45 : * addServer/s() method does not perform host lookup and must have a
46 : * dotted-quad IP sent for it to be able to connect to the job server.
47 : * @param array $opts Configuration options
48 : */
49 : public function setGearmanClient($opts = null)
50 : {
51 7 : if (is_null($opts)) {
52 7 : $opts = Mage::getStoreConfig('gearman_options');
53 7 : }
54 7 : $servers = explode(',', $opts['gearman']['server']);
55 7 : $ports = explode(',', $opts['gearman']['port']);
56 7 : $count = count($servers);
57 7 : $onePort = (count($servers) !== count($ports)) ? true : false;
58 7 : for ($i = 0; $i < $count; ++$i) {
59 7 : $servers[$i] .= ':' . (($onePort) ? $ports[0] : $ports[$i]);
60 7 : }
61 :
62 : if (
63 7 : class_exists('Net_Gearman_Client') &&
64 7 : 'net' === $opts['gearman']['type']) {
65 :
66 4 : $this->_client = new Net_Gearman_Client($servers);
67 4 : }
68 : else {
69 7 : $this->_client = new GearmanClient();
70 7 : $this->_client->addServers(
71 7 : implode(',', $servers)
72 7 : );
73 : }
74 7 : }
75 :
76 : /**
77 : * Returns the current client object used for dispatching messages
78 : *
79 : * You should not have to use this method, and it is meant for testing
80 : *
81 : * @return GearmanClient|Net_Gearman_Client The client being used
82 : */
83 : public function getGearmanClient()
84 : {
85 1 : return $this->_client;
86 : }
87 :
88 : /**
89 : * Send the job to the queue specified
90 : * <code>
91 : * $queue = Mage::getModel('gearman/queue');
92 : * $id = $queue->dispatchTask($task);
93 : * </code>
94 : *
95 : * @param array $task Array containing the 'queue' name and the task
96 : * @return string|false The ID for the submitted task if the Gearman extension is used
97 : */
98 : public function dispatchTask($task)
99 : {
100 4 : if (get_class($this->_client) === 'Net_Gearman_Client') {
101 3 : $ngTask = new Net_Gearman_Task(
102 3 : $task['queue'],
103 3 : array($task['task'])
104 3 : );
105 3 : $this->_client->submitTask($ngTask);
106 : // There is no way to query a job status in Net_Gearman
107 : // presently, so no point in returning this...
108 : // return $ngTask->handle;
109 3 : return null;
110 : }
111 : else {
112 4 : return $this->_client->doBackground(
113 4 : $task['queue'],
114 4 : serialize($task['task'])
115 4 : );
116 : }
117 : }
118 :
119 : /**
120 : * Check whether a previously submitted job has completed
121 : * <code>
122 : * if ($queue->checkTaskComplete($id)) {
123 : * // work has been done
124 : * }
125 : * </code>
126 : *
127 : * @param string $id The unique Gearman job ID
128 : * @return boolean Whether task is complete or not
129 : */
130 : public function checkTaskComplete($jobId)
131 : {
132 1 : if (get_class($this->_client) !== 'Net_Gearman_Client') {
133 1 : $status = $this->_client->jobStatus($jobId);
134 1 : return !$status[0];
135 : }
136 : else {
137 1 : return null;
138 : }
139 : }
140 :
141 : /**
142 : * Check the status of a previously submitted job
143 : * <code>
144 : * while (($status = $queue->checkJobStatus($id)) !== 'done') {
145 : * echo "$status% complete\n";
146 : * sleep(1);
147 : * }
148 : * </code>
149 : *
150 : * @param string $id The unique Gearman job ID
151 : * @return null|string
152 : */
153 : public function checkJobStatus($jobId)
154 : {
155 1 : if (get_class($this->_client) !== 'Net_Gearman_Client') {
156 1 : $status = $this->_client->jobStatus($jobId);
157 1 : return $this->getJobStatus($status);
158 : }
159 : else {
160 1 : return null;
161 : }
162 : }
163 :
164 : /**
165 : * Returns the current job status
166 : *
167 : * Turns the status array from Gearman into a meaningful status
168 : * to report back to the client
169 : *
170 : * @return string The current status
171 : */
172 : public function getJobStatus($status)
173 : {
174 2 : $out = '';
175 2 : if ($status[0] && !$status[1]) {
176 1 : $out = 'queued';
177 1 : }
178 2 : else if ($status[0] && $status[1]) {
179 2 : if ($status[2] === 0 && $status[3] === 0) {
180 2 : $out = 'working';
181 2 : }
182 : else {
183 2 : $out = ((int) $status[2] / $status[3]) * 100;
184 : }
185 2 : }
186 2 : else if (!$status[0] && !$status[1]) {
187 2 : $out = 'done';
188 2 : }
189 2 : return $out;
190 : }
191 :
192 : /**
193 : * Calls a Gearman task and waits for it's return value
194 : * <code>
195 : * $ret = $queue->blockingCall($task);
196 : * </code>
197 : *
198 : * @return array|null The results from the task
199 : */
200 : public function blockingCall($task, $timeout = null)
201 : {
202 2 : if (get_class($this->_client) === 'GearmanClient') {
203 2 : if (is_null($timeout)) {
204 1 : $opts = Mage::getStoreConfig('gearman_options');
205 1 : $timeout = $opts['gearman']['timeout'];
206 1 : }
207 2 : $start = time();
208 : do {
209 2 : $ret = $this->_client->do(
210 2 : $task['queue'],
211 2 : serialize($task['task'])
212 2 : );
213 2 : $code = $this->_client->returnCode();
214 2 : sleep(1); // to avoid flooding
215 2 : $wait = time() < ($start + $timeout);
216 : }
217 2 : while ($code !== GEARMAN_SUCCESS && $wait);
218 2 : if (!$wait) {
219 1 : return null;
220 : }
221 : else {
222 1 : return unserialize($ret);
223 : }
224 : }
225 : else {
226 1 : return null;
227 : }
228 : }
|