def __getitem__(self, queue_name):
return self.__getattr__(queue_name)
+ def receive(self, queues=[], timeout=0):
+ if len(queues) == 0:
+ return None
-class Queue(object):
- def __init__(self, connection, queue_name):
- self.connection = connection
- self.queue_name = queue_name
- self.connection.channel.queue_declare(queue=self.queue_name)
-
- def receive(self, timeout=0, queue_name=None):
- if queue_name is None:
- queue_name = self.queue_name
def _handle_timeout(signum, frame):
raise TimeoutError(os.strerror(errno.ETIME))
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(timeout)
+
res = None
try:
- method, properties, body = self.connection.channel.consume(queue_name).next()
- res = Message(self.connection, method, properties, body)
+ method = None
+ i = 0
+ while method is None:
+ method, properties, body = self.channel.basic_get(queues[i])
+ if i == len(queues) - 1:
+ i = 0
+ else:
+ i += 1
+ res = Message(self, method, properties, body)
finally:
signal.alarm(0)
+
return res
+
+class Queue(object):
+ def __init__(self, connection, queue_name):
+ self.connection = connection
+ self.queue_name = queue_name
+ self.connection.channel.queue_declare(queue=self.queue_name)
+
+ def receive(self, timeout=0):
+ return self.connection.receive(queues=[self.queue_name], timeout=timeout)
+
def send(self, body, wait_response=False, timeout=0):
properties = None
response = None
if wait_response:
- response = self.receive(timeout=timeout, queue_name=self.connection.callback_queue)
+ response = self.connection.receive(queues=[self.connection.callback_queue], timeout=timeout)
return response
self.method = method
self.properties = properties
self.body = body
+ self._ack = False
def ack(self):
- self.connection.channel.basic_ack(delivery_tag=self.method.delivery_tag)
+ if not self._ack:
+ self.connection.channel.basic_ack(delivery_tag=self.method.delivery_tag)
+ self._ack = True
def response(self, body):
- print self.properties
- print self.properties.reply_to
if self.properties.reply_to:
self.connection.channel.basic_publish(exchange='',
routing_key=self.properties.reply_to,