1 # -*- coding: utf-8 -*-
2 ##############################################################################
4 # pyrabbit, a Python library for easy playing with RabbitMQ
5 # Copyright (C) 2013 by Javier Sancho Fernandez <jsf at jsancho dot org>
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 ##############################################################################
28 class TimeoutError(Exception):
31 class Connection(object):
32 def __init__(self, host):
34 self.open(host=self.host)
36 def open(self, host=None):
39 self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=host))
40 self.channel = self.connection.channel()
41 self.callback_queue = None
45 self.connection.close()
47 def __getattr__(self, queue_name):
48 return Queue(self, queue_name)
50 def __getitem__(self, queue_name):
51 return self.__getattr__(queue_name)
53 def receive(self, queues=[], timeout=0):
57 def _handle_timeout(signum, frame):
58 raise TimeoutError(os.strerror(errno.ETIME))
59 signal.signal(signal.SIGALRM, _handle_timeout)
67 method, properties, body = self.channel.basic_get(queues[i])
68 if i == len(queues) - 1:
72 res = Message(self, method, properties, body)
80 def __init__(self, connection, queue_name):
81 self.connection = connection
82 self.queue_name = queue_name
83 self.connection.channel.queue_declare(queue=self.queue_name)
85 def receive(self, timeout=0):
86 return self.connection.receive(queues=[self.queue_name], timeout=timeout)
88 def send(self, body, wait_response=False, timeout=0):
92 corr_id = str(uuid.uuid4())
93 if self.connection.callback_queue is None:
94 self.connection.callback_queue = self.connection.channel.queue_declare(exclusive=True).method.queue
95 properties = pika.BasicProperties(
96 reply_to=self.connection.callback_queue,
97 correlation_id=corr_id,
100 properties = pika.BasicProperties()
102 self.connection.channel.basic_publish(exchange='',
103 routing_key=self.queue_name,
104 properties=properties,
109 response = self.connection.receive(queues=[self.connection.callback_queue], timeout=timeout)
114 class Message(object):
115 def __init__(self, connection, method, properties, body):
116 self.connection = connection
118 self.properties = properties
124 self.connection.channel.basic_ack(delivery_tag=self.method.delivery_tag)
127 def response(self, body):
128 if self.properties.reply_to:
129 self.connection.channel.basic_publish(exchange='',
130 routing_key=self.properties.reply_to,
131 properties=pika.BasicProperties(correlation_id=self.properties.correlation_id),