import errno
import os
import pika
-import signal
+import time
import uuid
class TimeoutError(Exception):
def __getitem__(self, queue_name):
return self.__getattr__(queue_name)
+ def receive(self, queues=[], timeout=0):
+ if len(queues) == 0:
+ return None
+
+ t_start = time.time()
+ method = None
+ i = 0
+ delay = 0.0
+ while method is None and (time.time()-t_start < timeout or timeout <= 0):
+ time.sleep(delay)
+ if delay < 1:
+ delay += 0.01
+ method, properties, body = self.channel.basic_get(queues[i])
+ if i == len(queues) - 1:
+ i = 0
+ else:
+ i += 1
+
+ if method is None:
+ raise TimeoutError(os.strerror(errno.ETIME))
+ else:
+ return Message(self, method, properties, body)
+
class Queue(object):
def __init__(self, connection, queue_name):
self.queue_name = queue_name
self.connection.channel.queue_declare(queue=self.queue_name)
- def receive(self, timeout=0, queue_name=None):
- if queue_name is None:
- queue_name = self.queue_name
- def _handle_timeout(signum, frame):
- raise TimeoutError(os.strerror(errno.ETIME))
- signal.signal(signal.SIGALRM, _handle_timeout)
- signal.alarm(timeout)
- res = None
- try:
- method, properties, body = self.connection.channel.consume(queue_name).next()
- res = Message(self.connection, method, properties, body)
- finally:
- signal.alarm(0)
- return res
+ def receive(self, timeout=0):
+ return self.connection.receive(queues=[self.queue_name], timeout=timeout)
def send(self, body, wait_response=False, timeout=0):
properties = None
response = None
if wait_response:
- response = self.receive(timeout=timeout, queue_name=self.connection.callback_queue)
+ response = self.connection.receive(queues=[self.connection.callback_queue], timeout=timeout)
return response
self.method = method
self.properties = properties
self.body = body
+ self._ack = False
def ack(self):
- self.connection.channel.basic_ack(delivery_tag=self.method.delivery_tag)
+ if not self._ack:
+ self.connection.channel.basic_ack(delivery_tag=self.method.delivery_tag)
+ self._ack = True
def response(self, body):
- print self.properties
- print self.properties.reply_to
if self.properties.reply_to:
self.connection.channel.basic_publish(exchange='',
routing_key=self.properties.reply_to,