Receiving from more than one queue (using basic_get instead consume)
authorJavier Sancho <jsf@jsancho.org>
Wed, 23 Oct 2013 11:01:41 +0000 (13:01 +0200)
committerJavier Sancho <jsf@jsancho.org>
Wed, 23 Oct 2013 11:01:41 +0000 (13:01 +0200)
pyrabbit.py

index 4cee3cf9113d94a44f9b987ce607d0ae89180ee9..5b47b4dd273b416a44ed344cb7339b6b796b7d72 100644 (file)
@@ -50,28 +50,41 @@ class Connection(object):
     def __getitem__(self, queue_name):
         return self.__getattr__(queue_name)
 
+    def receive(self, queues=[], timeout=0):
+        if len(queues) == 0:
+            return None
 
-class Queue(object):
-    def __init__(self, connection, queue_name):
-        self.connection = connection
-        self.queue_name = queue_name
-        self.connection.channel.queue_declare(queue=self.queue_name)
-
-    def receive(self, timeout=0, queue_name=None):
-        if queue_name is None:
-            queue_name = self.queue_name
         def _handle_timeout(signum, frame):
             raise TimeoutError(os.strerror(errno.ETIME))
         signal.signal(signal.SIGALRM, _handle_timeout)
         signal.alarm(timeout)
+
         res = None
         try:
-            method, properties, body = self.connection.channel.consume(queue_name).next()
-            res = Message(self.connection, method, properties, body)
+            method = None
+            i = 0
+            while method is None:
+                method, properties, body = self.channel.basic_get(queues[i])
+                if i == len(queues) - 1:
+                    i = 0
+                else:
+                    i += 1
+            res = Message(self, method, properties, body)
         finally:
             signal.alarm(0)
+
         return res
 
+
+class Queue(object):
+    def __init__(self, connection, queue_name):
+        self.connection = connection
+        self.queue_name = queue_name
+        self.connection.channel.queue_declare(queue=self.queue_name)
+
+    def receive(self, timeout=0):
+        return self.connection.receive(queues=[self.queue_name], timeout=timeout)
+
     def send(self, body, wait_response=False, timeout=0):
         properties = None
 
@@ -93,7 +106,7 @@ class Queue(object):
 
         response = None
         if wait_response:
-            response = self.receive(timeout=timeout, queue_name=self.connection.callback_queue)
+            response = self.connection.receive(queues=[self.connection.callback_queue], timeout=timeout)
 
         return response
 
@@ -104,13 +117,14 @@ class Message(object):
         self.method = method
         self.properties = properties
         self.body = body
+        self._ack = False
 
     def ack(self):
-        self.connection.channel.basic_ack(delivery_tag=self.method.delivery_tag)
+        if not self._ack:
+            self.connection.channel.basic_ack(delivery_tag=self.method.delivery_tag)
+            self._ack = True
 
     def response(self, body):
-        print self.properties
-        print self.properties.reply_to
         if self.properties.reply_to:
             self.connection.channel.basic_publish(exchange='',
                                                   routing_key=self.properties.reply_to,