Source code for amqpstorm.management.connection

from amqpstorm.compatibility import json
from amqpstorm.compatibility import quote
from amqpstorm.management.base import ManagementHandler

API_CONNECTION = 'connections/%s'
API_CONNECTIONS = 'connections'


[docs]class Connection(ManagementHandler):
[docs] def get(self, connection): """Get Connection details. :param str connection: Connection name :raises ApiError: Raises if the remote server encountered an error. :raises ApiConnectionError: Raises if there was a connectivity issue. :rtype: dict """ return self.http_client.get(API_CONNECTION % connection)
[docs] def list(self): """Get Connections. :raises ApiError: Raises if the remote server encountered an error. :raises ApiConnectionError: Raises if there was a connectivity issue. :rtype: list """ return self.http_client.get(API_CONNECTIONS)
[docs] def close(self, connection, reason='Closed via management api'): """Close Connection. :param str connection: Connection name :param str reason: Reason for closing connection. :raises ApiError: Raises if the remote server encountered an error. :raises ApiConnectionError: Raises if there was a connectivity issue. :rtype: None """ close_payload = json.dumps({ 'name': connection, 'reason': reason }) connection = quote(connection, '') return self.http_client.delete(API_CONNECTION % connection, payload=close_payload, headers={ 'X-Reason': reason })