MessageHandler MH Sign Up

How to configure the Event Sourcing Runtime

C

The event sourcing runtime, helps you to build aggregate roots and projections, which are both patterns that work on event streams.

Configuring event sourcing

To set up the event sourcing from a stream, you need one of the MessageHandler.EventSourcing extensions.

Currently there is only one, being MessageHandler.EventSourcing.AzureTableStorage.

PM> Install-Package MessageHandler.EventSourcing.AzureTableStorage

Integrating into an existing handler runtime configuration

The most common scenario is to add event sourcing as a capability to a host integrated handler runtime.

To make this easy, there is an extension method called EventSourcing.

runtimeConfiguration.EventSourcing(source => {

}

Setting up the event stream

Once you have a configuration instance, you can start to configure the event stream.

As a first parameter to the Stream configuration function, you pass in the name of the stream.

The second parameter provides a callback where you want to stream from.

The MessageHandler.EventSourcing.AzureTableStorage provides an extension method to stream from AzureTableStorage, by providing a connection string and a table name.

runtimeConfiguration.EventSourcing(source =>
{
    source.Stream(nameof(OrderBooking),
        from => from.AzureTableStorage(connectionString, nameof(OrderBooking)),
        into =>
        {

        });
});

The third parameter allows you to configure where you want to stream events to.

Enabling an aggregate root

Manipulation of a stream is performed through an aggregate root pattern.

To stream into this aggregate, use the Aggregate extension method and provide the type of the aggregate root.

runtimeConfiguration.EventSourcing(source =>
{
    source.Stream(nameof(OrderBooking),
        from => from.AzureTableStorage(connectionString, nameof(OrderBooking)),
        into =>
        {
            into.Aggregate<OrderBooking>();
        });
});

Enabling projections

To stream int a projection, use the Projection extension method and provide the type of the projection logic.

runtimeConfiguration.EventSourcing(source =>
{
    source.Stream(nameof(OrderBooking),
        from => from.AzureTableStorage(connectionString, nameof(OrderBooking)),
        into =>
        {
            into.Projection<BookingProjection>();
        });
});

Adding transient channels

Whenever you need to dispatch persisted messages to a transient destination, you can add the channel towards that destination using the UseTransientChannel extension method.

This method is extending the aggregate configuration, as only aggregates are allowed to add events to the stream.

runtimeConfiguration.EventSourcing(source =>
{
    source.Stream(nameof(OrderBooking),
        from => from.AzureTableStorage(connectionString, nameof(OrderBooking)),
        into =>
        {
            into.Aggregate<OrderBooking>()
                .UseChannel<ForwardToSignalr>();
        });
});

The Push method of this channel will be called after events have been flushed to the event store.

public class ForwardToSignalr : MessageHandler.EventSourcing.DomainModel.IChannel
{
    public Task Push(IEnumerable<SourcedEvent> events)
    {
           
    }
}

Note: Channels are only to be used for transient communication, and are not to be considered guaranteed or durable. Any exceptions thrown from them will only be logged, but then ignored.

Adding the outbox, an atomic channel

When you need durable, atomic, communication from the event store towards a destination, use the outbox instead.

The outbox can be found in the MessageHandler.EventSourcing.Outbox package.

PM> Install-Package MessageHandler.EventSourcing.Outbox

The outbox integrates the event sourcing runtime with the atomic processing runtime, where the event store acts as an input for a message pump towards the atomic dispatching pipeline.

runtimeConfiguration.EventSourcing(source =>
{
    source.Stream(nameof(OrderBooking),
        from => from.AzureTableStorage(connectionString, nameof(OrderBooking)),
        into =>
        {
            into.Aggregate<OrderBooking>()
                EnableOutbox("Owner", pipeline =>
                {
                    pipeline.RouteMessages(to => to.Topic("your.topic", "ServiceBusConnectionString"));
                });
        });
});

Owner represents the instance of the pump, if not specified it will default to the handler instance id.

The pipeline callback, passed in, represent an outbound atomic dispatching pipeline configuration, which can be configured using the atomic processing configuration.

Manually creating the runtime

In rare situations, you may not want to tie the eventsource to an existing handler runtime, but use it standalone instead.

This is also possible by manually instantiating an EventsourcingConfiguration instance.

var eventsourcingConfiguration = new EventsourcingConfiguration();

After configuring it, you need to start the runtime manually and resolve the main enrypoints from the runtime object.

var runtime = EventsourcingRuntime.Create(eventsourcingConfiguration);

var repository = runtime.CreateAggregateRepository<OrderBooking>();
var projectionInvoker = runtime.CreateProjectionInvoker<Booking>();
var projectionRestorer = runtime.CreateProjectionRestorer<Booking>();

Sign up to our newsletter to get notified about new content and releases

You can unsubscribe at any time by clicking the link in the footer of your emails. I use Mailchimp as my marketing platform. By clicking subscribe, you acknowledge that your information will be transferred to Mailchimp for processing. Learn more about Mailchimp's privacy practices here.