Source code for amqpstorm.exchange

"""AMQPStorm Channel.Exchange."""

import logging

from pamqp.specification import Exchange as pamqp_exchange

from amqpstorm import compatibility
from amqpstorm.base import Handler
from amqpstorm.exception import AMQPInvalidArgument

LOGGER = logging.getLogger(__name__)


[docs]class Exchange(Handler): """RabbitMQ Exchange Operations.""" __slots__ = []
[docs] def declare(self, exchange='', exchange_type='direct', passive=False, durable=False, auto_delete=False, arguments=None): """Declare an Exchange. :param str exchange: Exchange name :param str exchange_type: Exchange type :param bool passive: Do not create :param bool durable: Durable exchange :param bool auto_delete: Automatically delete when not in use :param dict arguments: Exchange key/value arguments :raises AMQPInvalidArgument: Invalid Parameters :raises AMQPChannelError: Raises if the channel encountered an error. :raises AMQPConnectionError: Raises if the connection encountered an error. :rtype: dict """ if not compatibility.is_string(exchange): raise AMQPInvalidArgument('exchange should be a string') elif not compatibility.is_string(exchange_type): raise AMQPInvalidArgument('exchange_type should be a string') elif not isinstance(passive, bool): raise AMQPInvalidArgument('passive should be a boolean') elif not isinstance(durable, bool): raise AMQPInvalidArgument('durable should be a boolean') elif not isinstance(auto_delete, bool): raise AMQPInvalidArgument('auto_delete should be a boolean') elif arguments is not None and not isinstance(arguments, dict): raise AMQPInvalidArgument('arguments should be a dict or None') declare_frame = pamqp_exchange.Declare(exchange=exchange, exchange_type=exchange_type, passive=passive, durable=durable, auto_delete=auto_delete, arguments=arguments) return self._channel.rpc_request(declare_frame)
[docs] def delete(self, exchange='', if_unused=False): """Delete an Exchange. :param str exchange: Exchange name :param bool if_unused: Delete only if unused :raises AMQPInvalidArgument: Invalid Parameters :raises AMQPChannelError: Raises if the channel encountered an error. :raises AMQPConnectionError: Raises if the connection encountered an error. :rtype: dict """ if not compatibility.is_string(exchange): raise AMQPInvalidArgument('exchange should be a string') delete_frame = pamqp_exchange.Delete(exchange=exchange, if_unused=if_unused) return self._channel.rpc_request(delete_frame)
[docs] def bind(self, destination='', source='', routing_key='', arguments=None): """Bind an Exchange. :param str destination: Exchange name :param str source: Exchange to bind to :param str routing_key: The routing key to use :param dict arguments: Bind key/value arguments :raises AMQPInvalidArgument: Invalid Parameters :raises AMQPChannelError: Raises if the channel encountered an error. :raises AMQPConnectionError: Raises if the connection encountered an error. :rtype: dict """ if not compatibility.is_string(destination): raise AMQPInvalidArgument('destination should be a string') elif not compatibility.is_string(source): raise AMQPInvalidArgument('source should be a string') elif not compatibility.is_string(routing_key): raise AMQPInvalidArgument('routing_key should be a string') elif arguments is not None and not isinstance(arguments, dict): raise AMQPInvalidArgument('arguments should be a dict or None') bind_frame = pamqp_exchange.Bind(destination=destination, source=source, routing_key=routing_key, arguments=arguments) return self._channel.rpc_request(bind_frame)
[docs] def unbind(self, destination='', source='', routing_key='', arguments=None): """Unbind an Exchange. :param str destination: Exchange name :param str source: Exchange to unbind from :param str routing_key: The routing key used :param dict arguments: Unbind key/value arguments :raises AMQPInvalidArgument: Invalid Parameters :raises AMQPChannelError: Raises if the channel encountered an error. :raises AMQPConnectionError: Raises if the connection encountered an error. :rtype: dict """ if not compatibility.is_string(destination): raise AMQPInvalidArgument('destination should be a string') elif not compatibility.is_string(source): raise AMQPInvalidArgument('source should be a string') elif not compatibility.is_string(routing_key): raise AMQPInvalidArgument('routing_key should be a string') elif arguments is not None and not isinstance(arguments, dict): raise AMQPInvalidArgument('arguments should be a dict or None') unbind_frame = pamqp_exchange.Unbind(destination=destination, source=source, routing_key=routing_key, arguments=arguments) return self._channel.rpc_request(unbind_frame)