]> git.jsancho.org Git - pyrabbit.git/blobdiff - pyrabbit.py
Incremental delay when receiving messages (better performance)
[pyrabbit.git] / pyrabbit.py
index 4cee3cf9113d94a44f9b987ce607d0ae89180ee9..351e98d6d39994afe7ce3271ae8a952b901740f7 100644 (file)
@@ -22,7 +22,7 @@
 import errno
 import os
 import pika
-import signal
+import time
 import uuid
 
 class TimeoutError(Exception):
@@ -50,6 +50,29 @@ class Connection(object):
     def __getitem__(self, queue_name):
         return self.__getattr__(queue_name)
 
+    def receive(self, queues=[], timeout=0):
+        if len(queues) == 0:
+            return None
+
+        t_start = time.time()
+        method = None
+        i = 0
+        delay = 0.0
+        while method is None and (time.time()-t_start < timeout or timeout <= 0):
+            time.sleep(delay)
+            if delay < 1:
+                delay += 0.01
+            method, properties, body = self.channel.basic_get(queues[i])
+            if i == len(queues) - 1:
+                i = 0
+            else:
+                i += 1
+
+        if method is None:
+            raise TimeoutError(os.strerror(errno.ETIME))
+        else:
+            return Message(self, method, properties, body)
+
 
 class Queue(object):
     def __init__(self, connection, queue_name):
@@ -57,20 +80,8 @@ class Queue(object):
         self.queue_name = queue_name
         self.connection.channel.queue_declare(queue=self.queue_name)
 
-    def receive(self, timeout=0, queue_name=None):
-        if queue_name is None:
-            queue_name = self.queue_name
-        def _handle_timeout(signum, frame):
-            raise TimeoutError(os.strerror(errno.ETIME))
-        signal.signal(signal.SIGALRM, _handle_timeout)
-        signal.alarm(timeout)
-        res = None
-        try:
-            method, properties, body = self.connection.channel.consume(queue_name).next()
-            res = Message(self.connection, method, properties, body)
-        finally:
-            signal.alarm(0)
-        return res
+    def receive(self, timeout=0):
+        return self.connection.receive(queues=[self.queue_name], timeout=timeout)
 
     def send(self, body, wait_response=False, timeout=0):
         properties = None
@@ -93,7 +104,7 @@ class Queue(object):
 
         response = None
         if wait_response:
-            response = self.receive(timeout=timeout, queue_name=self.connection.callback_queue)
+            response = self.connection.receive(queues=[self.connection.callback_queue], timeout=timeout)
 
         return response
 
@@ -104,13 +115,14 @@ class Message(object):
         self.method = method
         self.properties = properties
         self.body = body
+        self._ack = False
 
     def ack(self):
-        self.connection.channel.basic_ack(delivery_tag=self.method.delivery_tag)
+        if not self._ack:
+            self.connection.channel.basic_ack(delivery_tag=self.method.delivery_tag)
+            self._ack = True
 
     def response(self, body):
-        print self.properties
-        print self.properties.reply_to
         if self.properties.reply_to:
             self.connection.channel.basic_publish(exchange='',
                                                   routing_key=self.properties.reply_to,