"""AMQPStorm Channel.Basic."""
import logging
import math
from pamqp import body as pamqp_body
from pamqp import header as pamqp_header
from pamqp import specification
from amqpstorm import compatibility
from amqpstorm.base import Handler
from amqpstorm.base import MAX_FRAME_SIZE
from amqpstorm.exception import AMQPChannelError
from amqpstorm.exception import AMQPInvalidArgument
from amqpstorm.message import Message
LOGGER = logging.getLogger(__name__)
[docs]class Basic(Handler):
"""RabbitMQ Basic Operations."""
__slots__ = ['_max_frame_size']
def __init__(self, channel, max_frame_size=None):
super(Basic, self).__init__(channel)
self._max_frame_size = max_frame_size or MAX_FRAME_SIZE
[docs] def qos(self, prefetch_count=0, prefetch_size=0, global_=False):
"""Specify quality of service.
:param int prefetch_count: Prefetch window in messages
:param int/long prefetch_size: Prefetch window in octets
:param bool global_: Apply to entire connection
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_integer(prefetch_count):
raise AMQPInvalidArgument('prefetch_count should be an integer')
elif not compatibility.is_integer(prefetch_size):
raise AMQPInvalidArgument('prefetch_size should be an integer')
elif not isinstance(global_, bool):
raise AMQPInvalidArgument('global_ should be a boolean')
qos_frame = specification.Basic.Qos(prefetch_count=prefetch_count,
prefetch_size=prefetch_size,
global_=global_)
return self._channel.rpc_request(qos_frame)
[docs] def get(self, queue='', no_ack=False, to_dict=False, auto_decode=True):
"""Fetch a single message.
:param str queue: Queue name
:param bool no_ack: No acknowledgement needed
:param bool to_dict: Should incoming messages be converted to a
dictionary before delivery.
:param bool auto_decode: Auto-decode strings when possible.
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:returns: Returns a single message, as long as there is a message in
the queue. If no message is available, returns None.
:rtype: amqpstorm.Message,dict,None
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
elif not isinstance(no_ack, bool):
raise AMQPInvalidArgument('no_ack should be a boolean')
elif self._channel.consumer_tags:
raise AMQPChannelError("Cannot call 'get' when channel is "
"set to consume")
get_frame = specification.Basic.Get(queue=queue,
no_ack=no_ack)
with self._channel.lock and self._channel.rpc.lock:
message = self._get_message(get_frame, auto_decode=auto_decode)
if message and to_dict:
return message.to_dict()
return message
[docs] def recover(self, requeue=False):
"""Redeliver unacknowledged messages.
:param bool requeue: Re-queue the messages
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not isinstance(requeue, bool):
raise AMQPInvalidArgument('requeue should be a boolean')
recover_frame = specification.Basic.Recover(requeue=requeue)
return self._channel.rpc_request(recover_frame)
[docs] def consume(self, callback=None, queue='', consumer_tag='',
exclusive=False, no_ack=False, no_local=False, arguments=None):
"""Start a queue consumer.
:param function callback: Message callback
:param str queue: Queue name
:param str consumer_tag: Consumer tag
:param bool no_local: Do not deliver own messages
:param bool no_ack: No acknowledgement needed
:param bool exclusive: Request exclusive access
:param dict arguments: Consume key/value arguments
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:returns: Consumer tag
:rtype: str
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
elif not compatibility.is_string(consumer_tag):
raise AMQPInvalidArgument('consumer_tag should be a string')
elif not isinstance(exclusive, bool):
raise AMQPInvalidArgument('exclusive should be a boolean')
elif not isinstance(no_ack, bool):
raise AMQPInvalidArgument('no_ack should be a boolean')
elif not isinstance(no_local, bool):
raise AMQPInvalidArgument('no_local should be a boolean')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
consume_rpc_result = self._consume_rpc_request(arguments, consumer_tag,
exclusive, no_ack,
no_local, queue)
tag = self._consume_add_and_get_tag(consume_rpc_result)
self._channel._consumer_callbacks[tag] = callback
return tag
[docs] def cancel(self, consumer_tag=''):
"""Cancel a queue consumer.
:param str consumer_tag: Consumer tag
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(consumer_tag):
raise AMQPInvalidArgument('consumer_tag should be a string')
cancel_frame = specification.Basic.Cancel(consumer_tag=consumer_tag)
result = self._channel.rpc_request(cancel_frame)
self._channel.remove_consumer_tag(consumer_tag)
return result
[docs] def publish(self, body, routing_key, exchange='', properties=None,
mandatory=False, immediate=False):
"""Publish a Message.
:param bytes,str,unicode body: Message payload
:param str routing_key: Message routing key
:param str exchange: The exchange to publish the message to
:param dict properties: Message properties
:param bool mandatory: Requires the message is published
:param bool immediate: Request immediate delivery
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: bool,None
"""
self._validate_publish_parameters(body, exchange, immediate, mandatory,
properties, routing_key)
properties = properties or {}
body = self._handle_utf8_payload(body, properties)
properties = specification.Basic.Properties(**properties)
method_frame = specification.Basic.Publish(exchange=exchange,
routing_key=routing_key,
mandatory=mandatory,
immediate=immediate)
header_frame = pamqp_header.ContentHeader(body_size=len(body),
properties=properties)
frames_out = [method_frame, header_frame]
for body_frame in self._create_content_body(body):
frames_out.append(body_frame)
if self._channel.confirming_deliveries:
with self._channel.rpc.lock:
return self._publish_confirm(frames_out, mandatory)
self._channel.write_frames(frames_out)
[docs] def ack(self, delivery_tag=0, multiple=False):
"""Acknowledge Message.
:param int/long delivery_tag: Server-assigned delivery tag
:param bool multiple: Acknowledge multiple messages
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:return:
"""
if not compatibility.is_integer(delivery_tag):
raise AMQPInvalidArgument('delivery_tag should be an integer')
elif not isinstance(multiple, bool):
raise AMQPInvalidArgument('multiple should be a boolean')
ack_frame = specification.Basic.Ack(delivery_tag=delivery_tag,
multiple=multiple)
self._channel.write_frame(ack_frame)
[docs] def nack(self, delivery_tag=0, multiple=False, requeue=True):
"""Negative Acknowledgement.
:param int/long delivery_tag: Server-assigned delivery tag
:param bool multiple: Negative acknowledge multiple messages
:param bool requeue: Re-queue the message
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:return:
"""
if not compatibility.is_integer(delivery_tag):
raise AMQPInvalidArgument('delivery_tag should be an integer')
elif not isinstance(multiple, bool):
raise AMQPInvalidArgument('multiple should be a boolean')
elif not isinstance(requeue, bool):
raise AMQPInvalidArgument('requeue should be a boolean')
nack_frame = specification.Basic.Nack(delivery_tag=delivery_tag,
multiple=multiple,
requeue=requeue)
self._channel.write_frame(nack_frame)
[docs] def reject(self, delivery_tag=0, requeue=True):
"""Reject Message.
:param int/long delivery_tag: Server-assigned delivery tag
:param bool requeue: Re-queue the message
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:return:
"""
if not compatibility.is_integer(delivery_tag):
raise AMQPInvalidArgument('delivery_tag should be an integer')
elif not isinstance(requeue, bool):
raise AMQPInvalidArgument('requeue should be a boolean')
reject_frame = specification.Basic.Reject(delivery_tag=delivery_tag,
requeue=requeue)
self._channel.write_frame(reject_frame)
def _consume_add_and_get_tag(self, consume_rpc_result):
"""Add the tag to the channel and return it.
:param dict consume_rpc_result:
:rtype: str
"""
consumer_tag = consume_rpc_result['consumer_tag']
self._channel.add_consumer_tag(consumer_tag)
return consumer_tag
def _consume_rpc_request(self, arguments, consumer_tag, exclusive, no_ack,
no_local, queue):
"""Create a Consume Frame and execute a RPC request.
:param str queue: Queue name
:param str consumer_tag: Consumer tag
:param bool no_local: Do not deliver own messages
:param bool no_ack: No acknowledgement needed
:param bool exclusive: Request exclusive access
:param dict arguments: Consume key/value arguments
:rtype: dict
"""
consume_frame = specification.Basic.Consume(queue=queue,
consumer_tag=consumer_tag,
exclusive=exclusive,
no_local=no_local,
no_ack=no_ack,
arguments=arguments)
return self._channel.rpc_request(consume_frame)
@staticmethod
def _validate_publish_parameters(body, exchange, immediate, mandatory,
properties, routing_key):
"""Validate Publish Parameters.
:param bytes,str,unicode body: Message payload
:param str routing_key: Message routing key
:param str exchange: The exchange to publish the message to
:param dict properties: Message properties
:param bool mandatory: Requires the message is published
:param bool immediate: Request immediate delivery
:raises AMQPInvalidArgument: Invalid Parameters
:return:
"""
if not compatibility.is_string(body):
raise AMQPInvalidArgument('body should be a string')
elif not compatibility.is_string(routing_key):
raise AMQPInvalidArgument('routing_key should be a string')
elif not compatibility.is_string(exchange):
raise AMQPInvalidArgument('exchange should be a string')
elif properties is not None and not isinstance(properties, dict):
raise AMQPInvalidArgument('properties should be a dict or None')
elif not isinstance(mandatory, bool):
raise AMQPInvalidArgument('mandatory should be a boolean')
elif not isinstance(immediate, bool):
raise AMQPInvalidArgument('immediate should be a boolean')
@staticmethod
def _handle_utf8_payload(body, properties):
"""Update the Body and Properties to the appropriate encoding.
:param bytes,str,unicode body: Message payload
:param dict properties: Message properties
:return:
"""
if 'content_encoding' not in properties:
properties['content_encoding'] = 'utf-8'
encoding = properties['content_encoding']
if compatibility.is_unicode(body):
body = body.encode(encoding)
elif compatibility.PYTHON3 and isinstance(body, str):
body = bytes(body, encoding=encoding)
return body
def _get_message(self, get_frame, auto_decode):
"""Get and return a message using a Basic.Get frame.
:param Basic.Get get_frame:
:param bool auto_decode: Auto-decode strings when possible.
:rtype: Message
"""
message_uuid = self._channel.rpc.register_request(
get_frame.valid_responses + ['ContentHeader', 'ContentBody']
)
try:
self._channel.write_frame(get_frame)
get_ok_frame = self._channel.rpc.get_request(message_uuid,
raw=True,
multiple=True)
if isinstance(get_ok_frame, specification.Basic.GetEmpty):
return None
content_header = self._channel.rpc.get_request(message_uuid,
raw=True,
multiple=True)
body = self._get_content_body(message_uuid,
content_header.body_size)
finally:
self._channel.rpc.remove(message_uuid)
return Message(channel=self._channel,
body=body,
method=dict(get_ok_frame),
properties=dict(content_header.properties),
auto_decode=auto_decode)
def _publish_confirm(self, frames_out, mandatory):
"""Confirm that message was published successfully.
:param list frames_out:
:rtype: bool
"""
confirm_uuid = self._channel.rpc.register_request(['Basic.Ack',
'Basic.Nack'])
self._channel.write_frames(frames_out)
result = self._channel.rpc.get_request(confirm_uuid, raw=True)
if mandatory:
self._channel.check_for_exceptions()
if isinstance(result, specification.Basic.Ack):
return True
return False
def _create_content_body(self, body):
"""Split body based on the maximum frame size.
This function is based on code from Rabbitpy.
https://github.com/gmr/rabbitpy
:param bytes,str,unicode body: Message payload
:rtype: collections.Iterable
"""
frames = int(math.ceil(len(body) / float(self._max_frame_size)))
for offset in compatibility.RANGE(0, frames):
start_frame = self._max_frame_size * offset
end_frame = start_frame + self._max_frame_size
body_len = len(body)
if end_frame > body_len:
end_frame = body_len
yield pamqp_body.ContentBody(body[start_frame:end_frame])
def _get_content_body(self, message_uuid, body_size):
"""Get Content Body using RPC requests.
:param str uuid_body: Rpc Identifier.
:param int body_size: Content Size.
:rtype: str
"""
body = bytes()
while len(body) < body_size:
body_piece = self._channel.rpc.get_request(message_uuid, raw=True,
multiple=True)
if not body_piece.value:
break
body += body_piece.value
return body