The stream processing runtime, helps you to build event generators, stream processors, and transient reactions.
Configuring stream processing
To set up stream processing, you need the MessageHandler.Runtime.StreamProcessing extensions package.
PM> Install-Package MessageHandler.Runtime.StreamProcessing
Integrating into an existing handler runtime configuration
The most common scenario is to add stream processing as a capability to a host integrated handler runtime.
To make this easy, there is an extension method called StreamProcessingPipeline
.
runtimeConfiguration.StreamProcessingPipeline(pipeline =>
{
});
Each invocation of this method will result in a new pipeline instance.
Once you have a pipeline configuration instance, you can start to configure it.
Pull messages from
The PullMessagesFrom
callback allows to configure a message pump to pull message from a source. You can call this method multiple times to pump all messages through the same pipeline.
runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
streaming.PullMessagesFrom(from => { })
}
Pull from an azure servicebus queue
To pull message from an azure service bus queue, there is the Queue
extension method on the pump configuration callback, to which you can provide a queue name and service bus connection string.
runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
streaming.PullMessagesFrom(from => from.Queue(name: "orderbooking", serviceBusConnectionString));
}
Pull from an azure servicebus topic
To pull message from an azure service bus subscription, there is the Topic
extension method on the pump configuration callback, to which you can provide a topic name, subscription name and service bus connection string.
runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
streaming.PullMessagesFrom(from => from.Topic(name: "orderbooking.events", subscription: "orderbooking.worker", serviceBusConnectionString));
});
Pull from an eventhub
To pull message from an event hub, there is the EventHub
extension method on the pump configuration callback, to which you can provide an eventhub name, consumer group name and event hubs connection string, storage connection string and container name.
runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
streaming.PullMessagesFrom(from => from.EventHub("receivehub", "$Default", eventhubsconnectionstring, storageConnectionString, "leases"));
});
The storage connection string and container name are used to store read positions of the event hub.
Pull from an IoT Hub
To pull message from an IoT hub, there is the EventHub
extension method on the pump configuration callback, to which you can provide an eventhub name, consumer group name and event hubs connection string, storage connection string and container name.
runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
streaming.PullMessagesFrom(from => from.IoTHub("receivehub", "$Default", eventhubsconnectionstring, storageConnectionString, "leases"));
});
The storage connection string and container name are used to store read positions of the event hub.
Deserialize messages
By default the stream processing runtime will not deserialize messages at all.
If you want to deserialize the incoming messages prior to processing it, you can call DeserializeMessagesWith
and pass in an instance of a serializer, such as the JSonMessageSerializer
.
runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
streaming.DeserializeMessagesWith(new JSonMessageSerializer())
});
Reactive processing
Configuring the reactive processing logic can be done by calling EnableReactiveProcessing
and providing the corresponding optional processing parts.
runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
streaming.EnableReactiveProcessing(processing => {
});
});
Standing Query
The first processing part you can add to a a pipeline is a standing query which will interpret, and potentially transform, the messages on the stream.
Providing the standing query is done using InterpretMessagesUsing
.
runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
streaming.EnableReactiveProcessing(processing => {
processing.InterpretMessagesUsing<AverageSensorValuesByDevice>();
});
});
The type passed into InterpretMessagesUsing
must implement IStandingQueryBuilder
, which represents a builder pattern.
This builder should compose and return a linq query for the observable passed in.
This linq query can use any of the reactive extension to filter, group, window or transform the incoming stream of messages.
public class AverageSensorValuesByDevice : IStandingQueryBuilder
{
public IObservable<ActionableContext> Build(IObservable<IProcessingContext> stream)
{
var query =
from context in stream
group context by context.Message.DeviceId into p
from b in p.Buffer(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(10))
select new ActionableContext(b.Select(bf => bf.Processing).FirstOrDefault())
{
Message = b.Count() > 0 ? new
{
Count = b.Count(),
Average = b.Average(m => m.Message.Value)
} : null
};
return query;
}
}
If you decide not to call InterpretMessagesUsing
all messages will simply pass through to the next stage as an action context.
Action
For each action context that gets returned from the standing query, the pipeline will invoke an action provided by RespondUsing
.
runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
streaming.EnableReactiveProcessing(processing => {
processing.RespondUsing<ShowAverageSensorValue>();
});
});
This action needs to implement the IAction
interface.
public class ShowAverageSensorValue : IAction
{
public async Task Action(ActionableContext context)
{
if (context.Message != null)
{
await Console.Out.WriteLineAsync("Average is " + context.Message.Average);
}
else
{
await Console.Out.WriteLineAsync("Query yielded no result");
}
}
}
Batch completion
Depending on the volume of message that passes through the standing query it may not be advised to perform any IO in the action.
Instead it would be better to delay all IO to the point when the incoming batch of messages is completed.
This can be achieved by providing an implementation of ICompleteBatches
to the CompleteBatchesWith
method.
runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
streaming.EnableReactiveProcessing(processing => {
processing.CompleteBatchesWith<DispatchAveragedSensorValues>();
});
});
The implementation of ICompleteBatches
will be invoked after the action on the last message of an incoming batch is invoked.
The context passed into the Complete
method will be the ActionContext of the last message.
public class DispatchAveragedSensorValues : ICompleteBatches
{
private readonly IDispatchMessages _dispatcher;
public DispatchAveragedSensorValues(IDispatchMessages dispatcher)
{
_dispatcher = dispatcher;
}
public async Task Complete(IProcessingContext context)
{
await _dispatcher.Dispatch(new[] { context.Message });
}
}
If you want to dispatch another batch of messages, consisting of message generated during intermediate IAction invocations, you'll need to keep track of them as part of the Iaction implementation.
Buffered dispatch
In order to be able to dispatch message from a stream processing pipeline, you'll need to be enable buffered dispatching.
runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
streaming
.EnableBufferedDispatching(dispatching =>
{
});
});
Serializing messages
Outgoing messages can be serialized, if desired, using a serializer of choice.
runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
streaming
.EnableBufferedDispatching(dispatching =>
{
dispatching.SerializeMessagesWith(new JSonMessageSerializer());
});
});
Routing messages
To route messages call RouteMessages
and specify one of the available destinations.
runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
streaming
.EnableBufferedDispatching(dispatching =>
{
dispatching.RouteMessages(to => { });
});
});
To event hubs
Routing towards an event hub can be done by calling to EventHub
and specify the name of the hub and a connection string pointing towards the event hubs namespace.
runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
streaming
.EnableBufferedDispatching(dispatching =>
{
dispatching.RouteMessages(to => to.EventHub("sendhub", eventhubsnamespace));
});
});
To queue
Routing towards a queue can be done by calling to Queue
and specify the name of the queue and a connection string pointing towards the servicebus namespace.
runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
streaming
.EnableBufferedDispatching(dispatching =>
{
dispatching.RouteMessages(to => to.Queue("orderbooking", serviceBusConnectionString));
});
});
To topic
Routing towards a topic can be done by calling to Topic
and specify the name of the topic and a connection string pointing towards the servicebus namespace.
runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
streaming
.EnableBufferedDispatching(dispatching =>
{
dispatching.RouteMessages(to => to.Topic("orderbooking.events", serviceBusConnectionString));
});
});