The stream processing runtime, helps you to dispatch messages from anywhere (outside the context of a stream processor), through a messages called buffered dispatching.
Configuring buffered dispatching
To set up buffered dispatching, you need the MessageHandler.Runtime.StreamProcessing extensions package.
PM> Install-Package MessageHandler.Runtime.StreamProcessing
Integrating into an existing handler runtime configuration
The most common scenario is to add buffered dispatching as a capability to a host integrated handler runtime.
To make this easy, there is an extension method called BufferedDispatchingPipeline
.
runtimeConfiguration.BufferedDispatchingPipeline(dispatching =>
{
});
Each invocation of this method will result in a new dispatching pipeline instance.
Once you have a pipeline configuration instance, you can start to configure it.
Serialize messages
By default the buffered dispatching pipeline, dispatches messages as byte array or string, without serializing them (implying that typed messages are not useable by default).
Serialize messages as json
If you want to serialize typed messages with a json serializer, you can use the SerializeMessagesWith
extension method on the dispatching pipeline and pass in an instance of the JSonMessageSerializer
.
runtimeConfiguration.BufferedDispatchingPipeline(dispatching =>
{
dispatching.SerializeMessagesWith(new JSonMessageSerializer());
});
Route messages
When you want to send a message outside the context of a stream processor, a route needs to be defined for the destination, you can do this with the RouteMessages
.
runtimeConfiguration.BufferedDispatchingPipeline(dispatching =>
{
dispatching.RouteMessages(to => {});
});
Route to an event hub
You can route messages from a pipeline to an azure eventhub, using the EventHub
extension method on the destination configuration callback, to which you can provide an event hub name and event hubs namespace connection string.
runtimeConfiguration.BufferedDispatchingPipeline(dispatching =>
{
dispatching.RouteMessages(to => to.EventHub("hubname", eventhubsnamespace));
});
Route to an azure servicebus queue
You can route messages from a pipeline to an azure service bus queue, using the Queue
extension method on the destination configuration callback, to which you can provide a queue name and service bus connection string.
runtimeConfiguration.BufferedDispatchingPipeline(dispatching =>
{
dispatching.RouteMessages(to => to.Queue("queuename", serviceBusConnectionString));
});
Route to an azure servicebus topic
You can route messages from a pipeline to an azure service bus topic, using the Topic
extension method on the destination configuration callback, to which you can provide a topic name and service bus connection string.
runtimeConfiguration.BufferedDispatchingPipeline(dispatching =>
{
dispatching.RouteMessages(to => to.Topic("topicname", serviceBusConnectionString));
});