X-Git-Url: https://git.jsancho.org/?p=pyrabbit.git;a=blobdiff_plain;f=pyrabbit.py;h=351e98d6d39994afe7ce3271ae8a952b901740f7;hp=4cee3cf9113d94a44f9b987ce607d0ae89180ee9;hb=HEAD;hpb=51073508e2cd1c5109bfc54d24472186b9f5f99a diff --git a/pyrabbit.py b/pyrabbit.py index 4cee3cf..351e98d 100644 --- a/pyrabbit.py +++ b/pyrabbit.py @@ -22,7 +22,7 @@ import errno import os import pika -import signal +import time import uuid class TimeoutError(Exception): @@ -50,6 +50,29 @@ class Connection(object): def __getitem__(self, queue_name): return self.__getattr__(queue_name) + def receive(self, queues=[], timeout=0): + if len(queues) == 0: + return None + + t_start = time.time() + method = None + i = 0 + delay = 0.0 + while method is None and (time.time()-t_start < timeout or timeout <= 0): + time.sleep(delay) + if delay < 1: + delay += 0.01 + method, properties, body = self.channel.basic_get(queues[i]) + if i == len(queues) - 1: + i = 0 + else: + i += 1 + + if method is None: + raise TimeoutError(os.strerror(errno.ETIME)) + else: + return Message(self, method, properties, body) + class Queue(object): def __init__(self, connection, queue_name): @@ -57,20 +80,8 @@ class Queue(object): self.queue_name = queue_name self.connection.channel.queue_declare(queue=self.queue_name) - def receive(self, timeout=0, queue_name=None): - if queue_name is None: - queue_name = self.queue_name - def _handle_timeout(signum, frame): - raise TimeoutError(os.strerror(errno.ETIME)) - signal.signal(signal.SIGALRM, _handle_timeout) - signal.alarm(timeout) - res = None - try: - method, properties, body = self.connection.channel.consume(queue_name).next() - res = Message(self.connection, method, properties, body) - finally: - signal.alarm(0) - return res + def receive(self, timeout=0): + return self.connection.receive(queues=[self.queue_name], timeout=timeout) def send(self, body, wait_response=False, timeout=0): properties = None @@ -93,7 +104,7 @@ class Queue(object): response = None if wait_response: - response = self.receive(timeout=timeout, queue_name=self.connection.callback_queue) + response = self.connection.receive(queues=[self.connection.callback_queue], timeout=timeout) return response @@ -104,13 +115,14 @@ class Message(object): self.method = method self.properties = properties self.body = body + self._ack = False def ack(self): - self.connection.channel.basic_ack(delivery_tag=self.method.delivery_tag) + if not self._ack: + self.connection.channel.basic_ack(delivery_tag=self.method.delivery_tag) + self._ack = True def response(self, body): - print self.properties - print self.properties.reply_to if self.properties.reply_to: self.connection.channel.basic_publish(exchange='', routing_key=self.properties.reply_to,