Initial commit
[pyrabbit.git] / pyrabbit.py
1 # -*- coding: utf-8 -*-
2 ##############################################################################
3 #
4 #    pyrabbit, a Python library for easy playing with RabbitMQ
5 #    Copyright (C) 2013 by Javier Sancho Fernandez <jsf at jsancho dot org>
6 #
7 #    This program is free software: you can redistribute it and/or modify
8 #    it under the terms of the GNU General Public License as published by
9 #    the Free Software Foundation, either version 3 of the License, or
10 #    (at your option) any later version.
11 #
12 #    This program is distributed in the hope that it will be useful,
13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #    GNU General Public License for more details.
16 #
17 #    You should have received a copy of the GNU General Public License
18 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20 ##############################################################################
21
22 import errno
23 import os
24 import pika
25 import signal
26 import uuid
27
28 class TimeoutError(Exception):
29     pass
30
31 class Connection(object):
32     def __init__(self, host):
33         self.host = host
34         self.open(host=self.host)
35
36     def open(self, host=None):
37         if host is None:
38             host = self.host
39         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=host))
40         self.channel = self.connection.channel()
41         self.callback_queue = None
42
43     def close(self):
44         self.channel.cancel()
45         self.connection.close()
46
47     def __getattr__(self, queue_name):
48         return Queue(self, queue_name)
49
50     def __getitem__(self, queue_name):
51         return self.__getattr__(queue_name)
52
53
54 class Queue(object):
55     def __init__(self, connection, queue_name):
56         self.connection = connection
57         self.queue_name = queue_name
58         self.connection.channel.queue_declare(queue=self.queue_name)
59
60     def receive(self, timeout=0, queue_name=None):
61         if queue_name is None:
62             queue_name = self.queue_name
63         def _handle_timeout(signum, frame):
64             raise TimeoutError(os.strerror(errno.ETIME))
65         signal.signal(signal.SIGALRM, _handle_timeout)
66         signal.alarm(timeout)
67         res = None
68         try:
69             method, properties, body = self.connection.channel.consume(queue_name).next()
70             res = Message(self.connection, method, properties, body)
71         finally:
72             signal.alarm(0)
73         return res
74
75     def send(self, body, wait_response=False, timeout=0):
76         properties = None
77
78         if wait_response:
79             corr_id = str(uuid.uuid4())
80             if self.connection.callback_queue is None:
81                 self.connection.callback_queue = self.connection.channel.queue_declare(exclusive=True).method.queue
82             properties = pika.BasicProperties(
83                 reply_to=self.connection.callback_queue,
84                 correlation_id=corr_id,
85                 )
86         else:
87             properties = pika.BasicProperties()
88
89         self.connection.channel.basic_publish(exchange='',
90                                               routing_key=self.queue_name,
91                                               properties=properties,
92                                               body=body)
93
94         response = None
95         if wait_response:
96             response = self.receive(timeout=timeout, queue_name=self.connection.callback_queue)
97
98         return response
99
100
101 class Message(object):
102     def __init__(self, connection, method, properties, body):
103         self.connection = connection
104         self.method = method
105         self.properties = properties
106         self.body = body
107
108     def ack(self):
109         self.connection.channel.basic_ack(delivery_tag=self.method.delivery_tag)
110
111     def response(self, body):
112         print self.properties
113         print self.properties.reply_to
114         if self.properties.reply_to:
115             self.connection.channel.basic_publish(exchange='',
116                                                   routing_key=self.properties.reply_to,
117                                                   properties=pika.BasicProperties(correlation_id=self.properties.correlation_id),
118                                                   body=body)