Skip to content
Olivier Coanet edited this page Feb 5, 2022 · 12 revisions

The consumers are implemented using event handlers.

Event processing

For every event handler, the disruptor will run a dedicated thread with the following pseudo-code:

var nextSequence = 0L;
while (IsRunning)
{
    // (1) Waits for the next event to be:
    // - published and
    // - available (if other event handlers are configured to run before this one).
    var availableSequence = WaitForAvailableSequence(nextSequence);

    // (2) Gets the events and invokes the event handler.
    while (nextSequence <= availableSequence)
    {
        var evt = RingBuffer[nextSequence];
        var endOfBatch = nextSequence == availableSequence;
        EventHandler.Handle(evt, endOfBatch);
        nextSequence++;
    }

    // (3) Marks the sequence as available, typically to allow following event handlers to process the event.
    SetSequenceAsAvailable(availableSequence);
}

The step (1) is the only part that requires synchronization. It can be lock-based or lock-free depending on your wait strategy choice.

It is often the case that multiple events are available, because multiple events are published together or simply because individual events are published faster than the handler can process them. The list of available events is called a batch. The goal of the inner loop (2) is to process the batch as fast as possible, without any synchronization. The handler is notified of the end of the batch.

The step (3) is a simple write of the current sequence and does not use any synchronization.

Event handler interfaces

IEventHandler<T>

void OnEvent(T data, long sequence, bool endOfBatch);

It is the simplest interface, with a method that it invoked for every event. The endOfBatch parameter indicates that the event is the last of the current batch, which can be very helpful for I/O based operations or to implement conflation.

Note that there is an equivalent interface for value type disruptors: IValueEventHandler<T>.

IBatchEventHandler<T>

void OnBatch(EventBatch<T> batch, long sequence);

Here the method will be invoked once per batch. EventBatch<T> is very similar to Span<T> except that it is not a ref struct. This handler has better performance than the default IEventHandler<T> for processing batches of multiple events. It is also much more convenient for explicit batch management.

IAsyncBatchEventHandler<T>

ValueTask OnBatch(EventBatch<T> batch, long sequence);

This event handler is quite unique because:

  • The processing of this handler can generate heap allocations.
  • The processing of this handler runs on thread-pool threads (other handlers runs on dedicated threads).

It is intended to be used in applications that can accept a reasonable amount of heap allocations and that use async APIs to process events.

Additional event handler methods

The event handler interfaces contain additional methods which have default (empty) implementations.

OnStart and OnShutdown

void OnStart();
void OnShutdown();

These methods are invoked once per event handler, before processing any event for OnStart and after event processing for OnShutdown. Those methods are invoked in the event processing thread, which can be helpful to setup the thread:

public void OnStart()
{
    Thread.CurrentThread.Name = "MyHandler main loop";
    Thread.CurrentThread.Priority = ThreadPriority.AboveNormal;
}

OnTimeout

void OnTimeout(long sequence);

This method is invoked when the wait strategy generates a timeout.

Most wait strategies will only signal when events are available. But some of them can generate periodic timeouts. This is useful if you need to execute code periodically even in the absence of events, or if you need to execute code after an event was received (e.g.: to implement throttling).

Of course, implementing this method in your event handler is only effective if you use a timeout-based wait strategy like TimeoutBlockingWaitStrategy for sync handlers or AsyncWaitStrategy with the dedicated constructor for async handlers.

OnBatchStart

void OnBatchStart(long batchSize);

This method in invoked on each batch before the first call to OnEvent. It is only available in IEventHandler<T> and IValueEventHandler<T>. This method was kept for compatibility, but if your handler needs to know the batch size, you should probably use IBatchEventHandler<T>.

Event handlers aggregation

The disruptor will create dedicated threads and event processing loops for each registered event handler.

disruptor.HandleEventsWith(new Handler1())
         .Then(new Handler2())
         .Then(new Handler3());

Here the disruptor will create 3 threads and 3 event processing loops. Having independent handlers can be required, for example if Handler2 is relatively slow and you need Handler1 to process events as fast as possible. But using independent handlers comes at a cost: it uses more CPU resources and the end-to-end latency is slightly higher because there is more coordination.

If both Handler1 and Handler2 are very fast and you want them to run in the same event processing loop, you can simply merge them into one handler, or use the AggregateEventHandler:

disruptor.HandleEventsWith(new AggregateEventHandler<Event>(new Handler1(), new Handler2()))
         .Then(new Handler3());

Making events available to next event handlers (early releasing)

If you look at the event processing pseudo code, you will notice that events are set as available to the next handlers only at the end of the batch. This is important because it means that the current handler "owns" the events until the end of the batch.

But in some cases, events might need to be released sooner. For example, if you have a slow handler, it will tend to process large batches, thus releasing events very late. The first events of the batch will be only available to the next event handlers at the end of the batch. Even if the slow handler is the last of the processing chain, this behavior can still be an issue because the ring buffer could appear full, when in fact some events are already processed and could be released.

The workaround is to implement IEventProcessorSequenceAware and to explicitly release events:

public class Handler : IEventHandler<Event>, IEventProcessorSequenceAware
{
    private ISequence _sequenceCallback;

    public void SetSequenceCallback(ISequence sequenceCallback)
    {
        _sequenceCallback = sequenceCallback;
    }

    public void OnEvent(Event data, long sequence, bool endOfBatch)
    {
        ProcessEvent(data);

        // Can be invoked for each event or using a custom logic.
        _sequenceCallback.SetValue(sequence);
    }

    private void ProcessEvent(Event data)
    {
        // ...
    }
}