Implementing your own router¶
A routing class publishes message on behalf of a dispatch worker. To do so it must provide three dispatch functions – one for inbound user messages, one for outbound user messages and one for events (e.g. delivery reports and acknowledgements). Failure messages are not routed via dispatchers and are typically sent directly to a failure worker. The receiving of messages and events is handled by the dispatcher itself.
A dispatcher provides three dictionaires of publishers as attributes:
exposed_publisher – publishers for sending inbound user messages to applications attached to the dispatcher.
exposed_event_publisher – publishers for sending events to applications.
- transport_publisher – publishers for sending outbound user
messages to transports attached to the dispatcher.
Each of these dictionaries is keyed by endpoint name. The keys for
exposed_publisher and exposed_event_publisher are the endpoints
listed in the exposed_names configuration option passed to the
dispatcher. The keys for transport_publisher are the endpoints
listed in the transport_names configuration option. Routing classes
publish messages by calling the publish_message()
method on one
of the publishers in these three dictionaries.
Routers are required to have the same interface as the
BaseDipatcherRouter
class which is described below.
-
class
vumi.dispatchers.
BaseDispatchRouter
(dispatcher, config)¶ Base class for dispatch routing logic.
This is a convenient definition of and set of common functionality for router classes. You need not subclass this and should not instantiate this directly.
The
__init__()
method should take exactly the following options so that your class can be instantiated from configuration in a standard way:Parameters: - dispatcher (vumi.dispatchers.BaseDispatchWorker) – The dispatcher this routing class is part of.
- config (dict) – The configuration options passed to the dispatcher.
If you are subclassing this class, you should not override
__init__()
. Custom setup should be done insetup_routing()
instead.-
setup_routing
()¶ Perform setup required for router.
Return type: Deferred or None Returns: May return a Deferred that is called when setup is complete
-
teardown_routing
()¶ Perform teardown required for router.
Return type: Deferred or None Returns: May return a Deferred that is called when teardown is complete
-
dispatch_inbound_message
(msg)¶ Dispatch an inbound user message to a publisher.
Parameters: msg (vumi.message.TransportUserMessage) – Message to dispatch.
-
dispatch_inbound_event
(msg)¶ Dispatch an event to a publisher.
Parameters: msg (vumi.message.TransportEvent) – Message to dispatch.
-
dispatch_outbound_message
(msg)¶ Dispatch an outbound user message to a publisher.
Parameters: msg (vumi.message.TransportUserMessage) – Message to dispatch.
Example of a simple router implementation from
vumi.dispatcher.base
:
class SimpleDispatchRouter(BaseDispatchRouter):
"""Simple dispatch router that maps transports to apps.
Configuration options:
:param dict route_mappings:
A map of *transport_names* to *exposed_names*. Inbound
messages and events received from a given transport are
dispatched to the application attached to the corresponding
exposed name.
:param dict transport_mappings: An optional re-mapping of
*transport_names* to *transport_names*. By default, outbound
messages are dispatched to the transport attached to the
*endpoint* with the same name as the transport name given in
the message. If a transport name is present in this
dictionary, the message is instead dispatched to the new
transport name given by the re-mapping.
"""
def dispatch_inbound_message(self, msg):
names = self.config['route_mappings'][msg['transport_name']]
for name in names:
# copy message so that the middleware doesn't see a particular
# message instance multiple times
self.dispatcher.publish_inbound_message(name, msg.copy())
def dispatch_inbound_event(self, msg):
names = self.config['route_mappings'][msg['transport_name']]
for name in names:
# copy message so that the middleware doesn't see a particular
# message instance multiple times
self.dispatcher.publish_inbound_event(name, msg.copy())
def dispatch_outbound_message(self, msg):
name = msg['transport_name']
name = self.config.get('transport_mappings', {}).get(name, name)
if name in self.dispatcher.transport_publisher:
self.dispatcher.publish_outbound_message(name, msg)
else:
log.error(DispatcherError(
'Unknown transport_name: %s, discarding %r' % (
name, msg.payload)))