]> git.jsancho.org Git - pyrabbit.git/blob - pyrabbit.py
Incremental delay when receiving messages (better performance)
[pyrabbit.git] / pyrabbit.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    pyrabbit, a Python library for easy playing with RabbitMQ
5 #    Copyright (C) 2013 by Javier Sancho Fernandez <jsf at jsancho dot org>
6 #
7 #    This program is free software: you can redistribute it and/or modify
8 #    it under the terms of the GNU General Public License as published by
9 #    the Free Software Foundation, either version 3 of the License, or
10 #    (at your option) any later version.
11 #
12 #    This program is distributed in the hope that it will be useful,
13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #    GNU General Public License for more details.
16 #
17 #    You should have received a copy of the GNU General Public License
18 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20 ##############################################################################
21
22 import errno
23 import os
24 import pika
25 import time
26 import uuid
27
28 class TimeoutError(Exception):
29     pass
30
31 class Connection(object):
32     def __init__(self, host):
33         self.host = host
34         self.open(host=self.host)
35
36     def open(self, host=None):
37         if host is None:
38             host = self.host
39         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=host))
40         self.channel = self.connection.channel()
41         self.callback_queue = None
42
43     def close(self):
44         self.channel.cancel()
45         self.connection.close()
46
47     def __getattr__(self, queue_name):
48         return Queue(self, queue_name)
49
50     def __getitem__(self, queue_name):
51         return self.__getattr__(queue_name)
52
53     def receive(self, queues=[], timeout=0):
54         if len(queues) == 0:
55             return None
56
57         t_start = time.time()
58         method = None
59         i = 0
60         delay = 0.0
61         while method is None and (time.time()-t_start < timeout or timeout <= 0):
62             time.sleep(delay)
63             if delay < 1:
64                 delay += 0.01
65             method, properties, body = self.channel.basic_get(queues[i])
66             if i == len(queues) - 1:
67                 i = 0
68             else:
69                 i += 1
70
71         if method is None:
72             raise TimeoutError(os.strerror(errno.ETIME))
73         else:
74             return Message(self, method, properties, body)
75
76
77 class Queue(object):
78     def __init__(self, connection, queue_name):
79         self.connection = connection
80         self.queue_name = queue_name
81         self.connection.channel.queue_declare(queue=self.queue_name)
82
83     def receive(self, timeout=0):
84         return self.connection.receive(queues=[self.queue_name], timeout=timeout)
85
86     def send(self, body, wait_response=False, timeout=0):
87         properties = None
88
89         if wait_response:
90             corr_id = str(uuid.uuid4())
91             if self.connection.callback_queue is None:
92                 self.connection.callback_queue = self.connection.channel.queue_declare(exclusive=True).method.queue
93             properties = pika.BasicProperties(
94                 reply_to=self.connection.callback_queue,
95                 correlation_id=corr_id,
96                 )
97         else:
98             properties = pika.BasicProperties()
99
100         self.connection.channel.basic_publish(exchange='',
101                                               routing_key=self.queue_name,
102                                               properties=properties,
103                                               body=body)
104
105         response = None
106         if wait_response:
107             response = self.connection.receive(queues=[self.connection.callback_queue], timeout=timeout)
108
109         return response
110
111
112 class Message(object):
113     def __init__(self, connection, method, properties, body):
114         self.connection = connection
115         self.method = method
116         self.properties = properties
117         self.body = body
118         self._ack = False
119
120     def ack(self):
121         if not self._ack:
122             self.connection.channel.basic_ack(delivery_tag=self.method.delivery_tag)
123             self._ack = True
124
125     def response(self, body):
126         if self.properties.reply_to:
127             self.connection.channel.basic_publish(exchange='',
128                                                   routing_key=self.properties.reply_to,
129                                                   properties=pika.BasicProperties(correlation_id=self.properties.correlation_id),
130                                                   body=body)