From 659feb5dc696d8e37061ce88f12c52e77e75b8bc Mon Sep 17 00:00:00 2001 From: Javier Sancho Date: Wed, 23 Oct 2013 13:01:41 +0200 Subject: [PATCH] Receiving from more than one queue (using basic_get instead consume) --- pyrabbit.py | 44 +++++++++++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/pyrabbit.py b/pyrabbit.py index 4cee3cf..5b47b4d 100644 --- a/pyrabbit.py +++ b/pyrabbit.py @@ -50,28 +50,41 @@ class Connection(object): def __getitem__(self, queue_name): return self.__getattr__(queue_name) + def receive(self, queues=[], timeout=0): + if len(queues) == 0: + return None -class Queue(object): - def __init__(self, connection, queue_name): - self.connection = connection - self.queue_name = queue_name - self.connection.channel.queue_declare(queue=self.queue_name) - - def receive(self, timeout=0, queue_name=None): - if queue_name is None: - queue_name = self.queue_name def _handle_timeout(signum, frame): raise TimeoutError(os.strerror(errno.ETIME)) signal.signal(signal.SIGALRM, _handle_timeout) signal.alarm(timeout) + res = None try: - method, properties, body = self.connection.channel.consume(queue_name).next() - res = Message(self.connection, method, properties, body) + method = None + i = 0 + while method is None: + method, properties, body = self.channel.basic_get(queues[i]) + if i == len(queues) - 1: + i = 0 + else: + i += 1 + res = Message(self, method, properties, body) finally: signal.alarm(0) + return res + +class Queue(object): + def __init__(self, connection, queue_name): + self.connection = connection + self.queue_name = queue_name + self.connection.channel.queue_declare(queue=self.queue_name) + + def receive(self, timeout=0): + return self.connection.receive(queues=[self.queue_name], timeout=timeout) + def send(self, body, wait_response=False, timeout=0): properties = None @@ -93,7 +106,7 @@ class Queue(object): response = None if wait_response: - response = self.receive(timeout=timeout, queue_name=self.connection.callback_queue) + response = self.connection.receive(queues=[self.connection.callback_queue], timeout=timeout) return response @@ -104,13 +117,14 @@ class Message(object): self.method = method self.properties = properties self.body = body + self._ack = False def ack(self): - self.connection.channel.basic_ack(delivery_tag=self.method.delivery_tag) + if not self._ack: + self.connection.channel.basic_ack(delivery_tag=self.method.delivery_tag) + self._ack = True def response(self, body): - print self.properties - print self.properties.reply_to if self.properties.reply_to: self.connection.channel.basic_publish(exchange='', routing_key=self.properties.reply_to, -- 2.39.2