Implementing your own router

A routing class publishes message on behalf of a dispatch worker. To do so it must provide three dispatch functions – one for inbound user messages, one for outbound user messages and one for events (e.g. delivery reports and acknowledgements). Failure messages are not routed via dispatchers and are typically sent directly to a failure worker. The receiving of messages and events is handled by the dispatcher itself.

A dispatcher provides three dictionaires of publishers as attributes:

  • exposed_publisher – publishers for sending inbound user messages to applications attached to the dispatcher.

  • exposed_event_publisher – publishers for sending events to applications.

  • transport_publisher – publishers for sending outbound user

    messages to transports attached to the dispatcher.

Each of these dictionaries is keyed by endpoint name. The keys for exposed_publisher and exposed_event_publisher are the endpoints listed in the exposed_names configuration option passed to the dispatcher. The keys for transport_publisher are the endpoints listed in the transport_names configuration option. Routing classes publish messages by calling the publish_message() method on one of the publishers in these three dictionaries.

Routers are required to have the same interface as the BaseDipatcherRouter class which is described below.

class vumi.dispatchers.BaseDispatchRouter(dispatcher, config)

Base class for dispatch routing logic.

This is a convenient definition of and set of common functionality for router classes. You need not subclass this and should not instantiate this directly.

The __init__() method should take exactly the following options so that your class can be instantiated from configuration in a standard way:

Parameters:
  • dispatcher (vumi.dispatchers.BaseDispatchWorker) – The dispatcher this routing class is part of.
  • config (dict) – The configuration options passed to the dispatcher.

If you are subclassing this class, you should not override __init__(). Custom setup should be done in setup_routing() instead.

setup_routing()

Perform setup required for router.

Return type:Deferred or None
Returns:May return a Deferred that is called when setup is complete
teardown_routing()

Perform teardown required for router.

Return type:Deferred or None
Returns:May return a Deferred that is called when teardown is complete
dispatch_inbound_message(msg)

Dispatch an inbound user message to a publisher.

Parameters:msg (vumi.message.TransportUserMessage) – Message to dispatch.
dispatch_inbound_event(msg)

Dispatch an event to a publisher.

Parameters:msg (vumi.message.TransportEvent) – Message to dispatch.
dispatch_outbound_message(msg)

Dispatch an outbound user message to a publisher.

Parameters:msg (vumi.message.TransportUserMessage) – Message to dispatch.

Example of a simple router implementation from vumi.dispatcher.base:

class SimpleDispatchRouter(BaseDispatchRouter):
    """Simple dispatch router that maps transports to apps.

    Configuration options:

    :param dict route_mappings:
        A map of *transport_names* to *exposed_names*. Inbound
        messages and events received from a given transport are
        dispatched to the application attached to the corresponding
        exposed name.

    :param dict transport_mappings: An optional re-mapping of
        *transport_names* to *transport_names*.  By default, outbound
        messages are dispatched to the transport attached to the
        *endpoint* with the same name as the transport name given in
        the message. If a transport name is present in this
        dictionary, the message is instead dispatched to the new
        transport name given by the re-mapping.
    """

    def dispatch_inbound_message(self, msg):
        names = self.config['route_mappings'][msg['transport_name']]
        for name in names:
            # copy message so that the middleware doesn't see a particular
            # message instance multiple times
            self.dispatcher.publish_inbound_message(name, msg.copy())

    def dispatch_inbound_event(self, msg):
        names = self.config['route_mappings'][msg['transport_name']]
        for name in names:
            # copy message so that the middleware doesn't see a particular
            # message instance multiple times
            self.dispatcher.publish_inbound_event(name, msg.copy())

    def dispatch_outbound_message(self, msg):
        name = msg['transport_name']
        name = self.config.get('transport_mappings', {}).get(name, name)
        if name in self.dispatcher.transport_publisher:
            self.dispatcher.publish_outbound_message(name, msg)
        else:
            log.error(DispatcherError(
                'Unknown transport_name: %s, discarding %r' % (
                    name, msg.payload)))