The atomic processing runtime, helps you to build guaranteed reactions, projections, delegations and simple forms of task processing.
Configuring atomic processing
To set up atomic processing, you need the MessageHandler.Runtime.AtomicProcessing extensions package.
PM> Install-Package MessageHandler.Runtime.AtomicProcessing
Integrating into an existing handler runtime configuration
The most common scenario is to add atomic processing as a capability to a host integrated handler runtime.
To make this easy, there is an extension method called AtomicProcessingPipeline
.
runtimeConfiguration.AtomicProcessingPipeline(pipeline =>
{
});
Each invocation of this method will result in a new pipeline instance.
Once you have a pipeline configuration instance, you can start to configure it.
Pull messages from
The PullMessagesFrom
callback allows to configure a message pump to pull message from a source. You can call this method multiple times to pump all messages through the same pipeline.
runtimeConfiguration.AtomicProcessingPipeline(pipeline =>
{
pipeline.PullMessagesFrom(p => { });
});
Pull from an azure servicebus queue
To pull message from an azure service bus queue, there is the Queue
extension method on the pump configuration callback, to which you can provide a queue name and service bus connection string.
runtimeConfiguration.AtomicProcessingPipeline(pipeline =>
{
pipeline.PullMessagesFrom(p => p.Queue(name: "orderbooking", serviceBusConnectionString));
});
Pull from an azure servicebus topic
To pull message from an azure service bus subscription, there is the Topic
extension method on the pump configuration callback, to which you can provide a topic name, subscription name and service bus connection string.
runtimeConfiguration.AtomicProcessingPipeline(pipeline =>
{
pipeline.PullMessagesFrom(p => p.Topic(name: "orderbooking.events", subscription: "orderbooking.worker", serviceBusConnectionString));
});
Detecting message types
Not all dotnet types are representing messages.
When you need fine grained control over which messages need to be processed by the pipeline, you can register them individually.
runtimeConfiguration.AtomicProcessingPipeline(pipeline =>
{
pipeline.DetectType<BookingStarted>();
});
Or you can register all types in a contract assembly all at once.
runtimeConfiguration.AtomicProcessingPipeline(pipeline =>
{
pipeline.DetectTypesInAssembly(typeof(BookingStarted).Assembly);
});
Registering a handler
Handlers can be registered with the HandleMessagesWith
extension method.
runtimeConfiguration.AtomicProcessingPipeline(pipeline =>
{
pipeline.HandleMessagesWith<SendNotificationMail>();
});
Route messages
When handlers want to send messages, a route needs to be defined for the destination, you can do this with the RouteMessages
extension method.
runtimeConfiguration.AtomicProcessingPipeline(pipeline =>
{
pipeline.RouteMessages(to => {});
});
Route to an azure servicebus queue
You can route messages from a pipeline to an azure service bus queue, using the Queue
extension method on the destination configuration callback, to which you can provide a queue name and service bus connection string.
runtimeConfiguration.AtomicProcessingPipeline(pipeline =>
{
pipeline.RouteMessages(to => to.Queue("orderbooking", serviceBusConnectionString));
});
Route to an azure servicebus topic
You can route messages from a pipeline to an azure service bus topic, using the Topic
extension method on the destination configuration callback, to which you can provide a topic name and service bus connection string.
runtimeConfiguration.AtomicProcessingPipeline(pipeline =>
{
pipeline.RouteMessages(to => to.Topic("orderbooking.events", serviceBusConnectionString));
});