MessageHandler MH Sign Up

Stream Processing Runtime Configuration

C

The stream processing runtime, helps you to build event generators, stream processors, and transient reactions.

Configuring stream processing

To set up stream processing, you need the MessageHandler.Runtime.StreamProcessing extensions package.

PM> Install-Package MessageHandler.Runtime.StreamProcessing

Integrating into an existing handler runtime configuration

The most common scenario is to add stream processing as a capability to a host integrated handler runtime.

To make this easy, there is an extension method called StreamProcessingPipeline.

 runtimeConfiguration.StreamProcessingPipeline(pipeline =>
{
    
});

Each invocation of this method will result in a new pipeline instance.

Once you have a pipeline configuration instance, you can start to configure it.

Pull messages from

The PullMessagesFrom callback allows to configure a message pump to pull message from a source. You can call this method multiple times to pump all messages through the same pipeline.

runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
    streaming.PullMessagesFrom(from => { })
}

Pull from an azure servicebus queue

To pull message from an azure service bus queue, there is the Queue extension method on the pump configuration callback, to which you can provide a queue name and service bus connection string.

runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
    streaming.PullMessagesFrom(from => from.Queue(name: "orderbooking", serviceBusConnectionString));
}

Pull from an azure servicebus topic

To pull message from an azure service bus subscription, there is the Topic extension method on the pump configuration callback, to which you can provide a topic name, subscription name and service bus connection string.

 runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
    streaming.PullMessagesFrom(from => from.Topic(name: "orderbooking.events", subscription: "orderbooking.worker", serviceBusConnectionString));
});

Pull from an eventhub

To pull message from an event hub, there is the EventHub extension method on the pump configuration callback, to which you can provide an eventhub name, consumer group name and event hubs connection string, storage connection string and container name.

 runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
    streaming.PullMessagesFrom(from => from.EventHub("receivehub", "$Default", eventhubsconnectionstring, storageConnectionString, "leases"));
});

The storage connection string and container name are used to store read positions of the event hub.

Pull from an IoT Hub

To pull message from an IoT hub, there is the EventHub extension method on the pump configuration callback, to which you can provide an eventhub name, consumer group name and event hubs connection string, storage connection string and container name.

 runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
    streaming.PullMessagesFrom(from => from.IoTHub("receivehub", "$Default", eventhubsconnectionstring, storageConnectionString, "leases"));
});

The storage connection string and container name are used to store read positions of the event hub.

Deserialize messages

By default the stream processing runtime will not deserialize messages at all.

If you want to deserialize the incoming messages prior to processing it, you can call DeserializeMessagesWith and pass in an instance of a serializer, such as the JSonMessageSerializer.

 runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
    streaming.DeserializeMessagesWith(new JSonMessageSerializer())
});

Reactive processing

Configuring the reactive processing logic can be done by calling EnableReactiveProcessing and providing the corresponding optional processing parts.

 runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
    streaming.EnableReactiveProcessing(processing => { 

    });
});

Standing Query

The first processing part you can add to a a pipeline is a standing query which will interpret, and potentially transform, the messages on the stream.

Providing the standing query is done using InterpretMessagesUsing.

 runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
    streaming.EnableReactiveProcessing(processing => { 
        processing.InterpretMessagesUsing<AverageSensorValuesByDevice>();
    });
});

The type passed into InterpretMessagesUsing must implement IStandingQueryBuilder, which represents a builder pattern.

This builder should compose and return a linq query for the observable passed in.

This linq query can use any of the reactive extension to filter, group, window or transform the incoming stream of messages.

public class AverageSensorValuesByDevice : IStandingQueryBuilder
{
    public IObservable<ActionableContext> Build(IObservable<IProcessingContext> stream)
    {
        var query = 
            from context in stream
            group context by context.Message.DeviceId into p
            from b in p.Buffer(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(10))
            select new ActionableContext(b.Select(bf => bf.Processing).FirstOrDefault())
            {
                Message = b.Count() > 0 ? new
                {
                    Count = b.Count(),
                    Average = b.Average(m => m.Message.Value)
                } : null
            };
        return query;
    }
}

If you decide not to call InterpretMessagesUsing all messages will simply pass through to the next stage as an action context.

Action

For each action context that gets returned from the standing query, the pipeline will invoke an action provided by RespondUsing.

 runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
    streaming.EnableReactiveProcessing(processing => { 
       processing.RespondUsing<ShowAverageSensorValue>();
    });
});

This action needs to implement the IAction interface.

public class ShowAverageSensorValue : IAction
{
    public async Task Action(ActionableContext context)
    {
        if (context.Message != null)
        {
            await Console.Out.WriteLineAsync("Average is " + context.Message.Average);
        }
        else
        {
            await Console.Out.WriteLineAsync("Query yielded no result");
        }
    }
}

Batch completion

Depending on the volume of message that passes through the standing query it may not be advised to perform any IO in the action.

Instead it would be better to delay all IO to the point when the incoming batch of messages is completed.

This can be achieved by providing an implementation of ICompleteBatches to the CompleteBatchesWith method.

 runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
    streaming.EnableReactiveProcessing(processing => { 
       processing.CompleteBatchesWith<DispatchAveragedSensorValues>();
    });
});

The implementation of ICompleteBatches will be invoked after the action on the last message of an incoming batch is invoked.

The context passed into the Complete method will be the ActionContext of the last message.

public class DispatchAveragedSensorValues : ICompleteBatches
{
    private readonly IDispatchMessages _dispatcher;

    public DispatchAveragedSensorValues(IDispatchMessages dispatcher)
    {
        _dispatcher = dispatcher;
    }

    public async Task Complete(IProcessingContext context)
    {
        await _dispatcher.Dispatch(new[] { context.Message });
    }
}

If you want to dispatch another batch of messages, consisting of message generated during intermediate IAction invocations, you'll need to keep track of them as part of the Iaction implementation.

Buffered dispatch

In order to be able to dispatch message from a stream processing pipeline, you'll need to be enable buffered dispatching.

runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
    streaming       
        .EnableBufferedDispatching(dispatching =>
        {
            
        });
});

Serializing messages

Outgoing messages can be serialized, if desired, using a serializer of choice.

runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
    streaming       
        .EnableBufferedDispatching(dispatching =>
        {
            dispatching.SerializeMessagesWith(new JSonMessageSerializer());
        });
});

Routing messages

To route messages call RouteMessages and specify one of the available destinations.

runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
    streaming       
        .EnableBufferedDispatching(dispatching =>
        {
            dispatching.RouteMessages(to => { });
        });
});
To event hubs

Routing towards an event hub can be done by calling to EventHub and specify the name of the hub and a connection string pointing towards the event hubs namespace.

runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
    streaming       
        .EnableBufferedDispatching(dispatching =>
        {
            dispatching.RouteMessages(to => to.EventHub("sendhub", eventhubsnamespace));
        });
});
To queue

Routing towards a queue can be done by calling to Queue and specify the name of the queue and a connection string pointing towards the servicebus namespace.

runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
    streaming       
        .EnableBufferedDispatching(dispatching =>
        {
            dispatching.RouteMessages(to => to.Queue("orderbooking", serviceBusConnectionString));
        });
});
To topic

Routing towards a topic can be done by calling to Topic and specify the name of the topic and a connection string pointing towards the servicebus namespace.

runtimeConfiguration.StreamProcessingPipeline(streaming =>
{
    streaming       
        .EnableBufferedDispatching(dispatching =>
        {
            dispatching.RouteMessages(to => to.Topic("orderbooking.events", serviceBusConnectionString));
        });
});

Sign up to our newsletter to get notified about new content and releases

You can unsubscribe at any time by clicking the link in the footer of your emails. I use Mailchimp as my marketing platform. By clicking subscribe, you acknowledge that your information will be transferred to Mailchimp for processing. Learn more about Mailchimp's privacy practices here.