Skip to content

Commit

Permalink
Remove sequence barrier interfaces
Browse files Browse the repository at this point in the history
- Use concrete types for sequence barriers.
- Add ISequenceBarrierOptions to allow compile-time specialization
  • Loading branch information
ocoanet committed May 13, 2022
1 parent 2e96f65 commit e2f8392
Show file tree
Hide file tree
Showing 71 changed files with 1,595 additions and 1,573 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Runtime.CompilerServices;
using BenchmarkDotNet.Attributes;
using Disruptor.Benchmarks.Reference;
using Disruptor.Processing;

namespace Disruptor.Benchmarks;
Expand All @@ -16,7 +15,6 @@ public class EventProcessorBenchmarks_ProcessFilledRingBuffer

private readonly RingBuffer<XEvent> _ringBuffer;
private IEventProcessor<XEvent> _processor;
private IEventProcessor<XEvent> _processorRef;
private IEventProcessor<XEvent> _batchProcessor;

public EventProcessorBenchmarks_ProcessFilledRingBuffer()
Expand All @@ -34,7 +32,6 @@ public EventProcessorBenchmarks_ProcessFilledRingBuffer()
public void Setup()
{
_processor = EventProcessorFactory.Create(_ringBuffer, _ringBuffer.NewBarrier(), new XEventHandler(() => _processor.Halt()));
_processorRef = EventProcessorFactory.Create(_ringBuffer, _ringBuffer.NewBarrier(), new XEventHandler(() => _processorRef.Halt()), typeof(EventProcessorRef<,,,>));
_batchProcessor = EventProcessorFactory.Create(_ringBuffer, _ringBuffer.NewBarrier(), new XBatchEventHandler(() => _batchProcessor.Halt()));
}

Expand All @@ -44,12 +41,6 @@ public void Run()
_processor.Run();
}

//[Benchmark(OperationsPerInvoke = _ringBufferSize)]
public void RunRef()
{
_processorRef.Run();
}

[Benchmark(OperationsPerInvoke = _ringBufferSize)]
public void RunBach()
{
Expand Down Expand Up @@ -105,4 +96,4 @@ public void OnBatch(EventBatch<XEvent> batch, long sequence)
_shutdown.Invoke();
}
}
}
}
173 changes: 84 additions & 89 deletions src/Disruptor.Benchmarks/EventProcessorBenchmarks_Wait.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,82 +15,73 @@ public class EventProcessorBenchmarks_Wait
private const int _operationsPerInvoke = 1000;

private readonly IPartialEventProcessor _processor1;
private readonly PartialEventProcessor<ValueSequenceBarrier, TimeoutDeactivated> _processor2;
private readonly IPartialEventProcessor _processor2;

public EventProcessorBenchmarks_Wait()
{
var sequencer = new SingleProducerSequencer(64, new YieldingWaitStrategy());
var sequenceBarrierProxy = StructProxy.CreateProxyInstance(sequencer.NewBarrier());
var eventProcessorType = typeof(PartialEventProcessor<,>).MakeGenericType(sequenceBarrierProxy.GetType(), typeof(TimeoutDeactivated));
_processor1 = (IPartialEventProcessor)Activator.CreateInstance(eventProcessorType, sequenceBarrierProxy, new TimeoutDeactivated());

var waitStrategy = new YieldingWaitStrategy();
var sequencer = new SingleProducerSequencer(64, waitStrategy);
var cursorSequence = new Sequence();
_processor2 = new PartialEventProcessor<ValueSequenceBarrier, TimeoutDeactivated>(new ValueSequenceBarrier(sequencer, new YieldingWaitStrategy(), cursorSequence), default);
var dependentSequences = new ISequence[0];
var sequenceBarrier = new SequenceBarrier(sequencer, waitStrategy, cursorSequence, dependentSequences);
var sequenceBarrierClass = new SequenceBarrierClass(sequencer, waitStrategy, cursorSequence, dependentSequences);
var sequenceBarrierProxy = StructProxy.CreateProxyInstance(sequenceBarrierClass);
var eventProcessorType = typeof(PartialEventProcessor<,>).MakeGenericType(typeof(ISequenceBarrierOptions.IsDependentSequencePublished), sequenceBarrierProxy.GetType());
_processor1 = (IPartialEventProcessor)Activator.CreateInstance(eventProcessorType, sequenceBarrier, sequenceBarrierProxy);

_processor2 = new PartialEventProcessor<ISequenceBarrierOptions.IsDependentSequencePublished, SequenceBarrierStruct>(sequenceBarrier, new SequenceBarrierStruct(sequencer, waitStrategy, cursorSequence, dependentSequences));

sequencer.Publish(42);
cursorSequence.SetValue(42);
}

[Benchmark(Baseline = true, OperationsPerInvoke = _operationsPerInvoke)]
public void ProcessingLoop_CheckTimeout()
public void ProcessingLoop_Default()
{
_processor1.ProcessingLoop_CheckTimeout(42);
_processor1.ProcessingLoop_Options(42);
}

// [Benchmark(Baseline = true, OperationsPerInvoke = _operationsPerInvoke)]
public void ProcessingLoop_CheckTimeoutAndCustomSequenceBarrier()
[Benchmark(OperationsPerInvoke = _operationsPerInvoke)]
public void ProcessingLoop_Typed_Class_Proxy()
{
_processor2.ProcessingLoop_CheckTimeout(42);
_processor1.ProcessingLoop_Typed(42);
}

// [Benchmark(OperationsPerInvoke = _operationsPerInvoke)]
public void ProcessingLoop_TimeoutActivationStruct()
[Benchmark(OperationsPerInvoke = _operationsPerInvoke)]
public void ProcessingLoop_Typed_Struct()
{
_processor1.ProcessingLoop_TimeoutActivationStruct(42);
}

// [Benchmark(OperationsPerInvoke = _operationsPerInvoke)]
public void ProcessingLoop_TimeoutActivationStructAndCustomSequenceBarrier()
{
_processor2.ProcessingLoop_TimeoutActivationStruct(42);
}

// [Benchmark(OperationsPerInvoke = _operationsPerInvoke)]
public void ProcessingLoop_IgnoreTimeout()
{
_processor1.ProcessingLoop_IgnoreTimeout(42);
_processor2.ProcessingLoop_Typed(42);
}

public interface IPartialEventProcessor
{
void ProcessingLoop_CheckTimeout(long nextSequence);
void ProcessingLoop_TimeoutActivationStruct(long nextSequence);
void ProcessingLoop_IgnoreTimeout(long nextSequence);
void ProcessingLoop_Options(long nextSequence);
void ProcessingLoop_Typed(long nextSequence);
}

/// <summary>
/// Partial copy of <see cref="EventProcessor{T, TDataProvider, TSequenceBarrier, TEventHandler, TBatchStartAware}"/>
/// </summary>
public sealed class PartialEventProcessor<TSequenceBarrier, TTimeoutActivation> : IPartialEventProcessor
public sealed class PartialEventProcessor<TSequenceBarrierOptions, TSequenceBarrier> : IPartialEventProcessor
where TSequenceBarrierOptions : ISequenceBarrierOptions
where TSequenceBarrier : ISequenceBarrier
where TTimeoutActivation : ITimeoutActivation
{
private readonly Sequence _sequence = new();
private TSequenceBarrier _sequenceBarrier;
private TTimeoutActivation _timeoutActivation;
private readonly SequenceBarrier _sequenceBarrier;
private TSequenceBarrier _typedSequenceBarrier;

public PartialEventProcessor(TSequenceBarrier sequenceBarrier, TTimeoutActivation timeoutActivation)
public PartialEventProcessor(SequenceBarrier sequenceBarrier, TSequenceBarrier typedSequenceBarrier)
{
_sequenceBarrier = sequenceBarrier;
_timeoutActivation = timeoutActivation;
_typedSequenceBarrier = typedSequenceBarrier;
}

[MethodImpl(MethodImplOptions.NoInlining | Constants.AggressiveOptimization)]
public void ProcessingLoop_CheckTimeout(long nextSequence)
public void ProcessingLoop_Options(long nextSequence)
{
for (var i = 0; i < _operationsPerInvoke; i++)
{
var waitResult = _sequenceBarrier.WaitFor(nextSequence);
var waitResult = _sequenceBarrier.WaitFor<TSequenceBarrierOptions>(nextSequence);
if (waitResult.IsTimeout)
{
HandleTimeout(_sequence.Value);
Expand All @@ -105,18 +96,15 @@ public void ProcessingLoop_CheckTimeout(long nextSequence)
}

[MethodImpl(MethodImplOptions.NoInlining | Constants.AggressiveOptimization)]
public void ProcessingLoop_TimeoutActivationStruct(long nextSequence)
public void ProcessingLoop_Typed(long nextSequence)
{
for (var i = 0; i < _operationsPerInvoke; i++)
{
var waitResult = _sequenceBarrier.WaitFor(nextSequence);
if (_timeoutActivation.Enabled)
var waitResult = _typedSequenceBarrier.WaitFor(nextSequence);
if (waitResult.IsTimeout)
{
if (waitResult.IsTimeout)
{
HandleTimeout(_sequence.Value);
return;
}
HandleTimeout(_sequence.Value);
return;
}

var availableSequence = waitResult.UnsafeAvailableSequence;
Expand All @@ -126,20 +114,6 @@ public void ProcessingLoop_TimeoutActivationStruct(long nextSequence)
}
}

[MethodImpl(MethodImplOptions.NoInlining | Constants.AggressiveOptimization)]
public void ProcessingLoop_IgnoreTimeout(long nextSequence)
{
for (var i = 0; i < _operationsPerInvoke; i++)
{
var waitResult = _sequenceBarrier.WaitFor(nextSequence);

var availableSequence = waitResult.UnsafeAvailableSequence;
Process(availableSequence);

_sequence.SetValue(availableSequence);
}
}

[MethodImpl(MethodImplOptions.NoInlining)]
private void HandleTimeout(long sequence)
{
Expand All @@ -151,59 +125,80 @@ private void Process(long sequence)
}
}

public interface ITimeoutActivation
public interface ISequenceBarrier
{
bool Enabled { get; }
SequenceWaitResult WaitFor(long sequence);
}

public struct TimeoutActivated : ITimeoutActivation
public sealed class SequenceBarrierClass : ISequenceBarrier
{
public bool Enabled => true;
}

public struct TimeoutDeactivated : ITimeoutActivation
{
public bool Enabled => false;
}

public struct ValueSequenceBarrier : ISequenceBarrier
{
private readonly SingleProducerSequencer _sequencer;
private readonly YieldingWaitStrategy _waitStrategy;
private readonly ISequencer _sequencer;
private readonly IWaitStrategy _waitStrategy;
private readonly DependentSequenceGroup _dependentSequences;
private volatile CancellationTokenSource _cancellationTokenSource;
private CancellationTokenSource _cancellationTokenSource;

public ValueSequenceBarrier(SingleProducerSequencer sequencer, YieldingWaitStrategy waitStrategy, Sequence cursorSequence)
public SequenceBarrierClass(ISequencer sequencer, IWaitStrategy waitStrategy, Sequence cursorSequence, ISequence[] dependentSequences)
{
_sequencer = sequencer;
_waitStrategy = waitStrategy;
_dependentSequences = new DependentSequenceGroup(cursorSequence);
_dependentSequences = new DependentSequenceGroup(cursorSequence, dependentSequences);
_cancellationTokenSource = new CancellationTokenSource();
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
[MethodImpl(MethodImplOptions.AggressiveInlining | Constants.AggressiveOptimization)]
public SequenceWaitResult WaitFor(long sequence)
{
var cancellationToken = _cancellationTokenSource.Token;
cancellationToken.ThrowIfCancellationRequested();
_cancellationTokenSource.Token.ThrowIfCancellationRequested();

var result = _waitStrategy.WaitFor(sequence, _dependentSequences, cancellationToken);
var availableSequence = _dependentSequences.Value;
if (availableSequence >= sequence)
{
return availableSequence;
}

if (result.UnsafeAvailableSequence < sequence)
return result;
return InvokeWaitStrategy(sequence);
}

return _sequencer.GetHighestPublishedSequence(sequence, result.UnsafeAvailableSequence);
[MethodImpl(MethodImplOptions.NoInlining)]
private SequenceWaitResult InvokeWaitStrategy(long sequence)
{
return _waitStrategy.WaitFor(sequence, _dependentSequences, _cancellationTokenSource.Token);
}
}

public struct SequenceBarrierStruct : ISequenceBarrier
{
private readonly ISequencer _sequencer;
private readonly IWaitStrategy _waitStrategy;
private readonly DependentSequenceGroup _dependentSequences;
private CancellationTokenSource _cancellationTokenSource;

public DependentSequenceGroup DependentSequences => _dependentSequences;
public CancellationToken CancellationToken => _cancellationTokenSource.Token;
public SequenceBarrierStruct(ISequencer sequencer, IWaitStrategy waitStrategy, Sequence cursorSequence, ISequence[] dependentSequences)
{
_sequencer = sequencer;
_waitStrategy = waitStrategy;
_dependentSequences = new DependentSequenceGroup(cursorSequence, dependentSequences);
_cancellationTokenSource = new CancellationTokenSource();
}

public void ResetProcessing()
[MethodImpl(MethodImplOptions.AggressiveInlining | Constants.AggressiveOptimization)]
public SequenceWaitResult WaitFor(long sequence)
{
_cancellationTokenSource.Token.ThrowIfCancellationRequested();

var availableSequence = _dependentSequences.Value;
if (availableSequence >= sequence)
{
return availableSequence;
}

return InvokeWaitStrategy(sequence);
}

public void CancelProcessing()
[MethodImpl(MethodImplOptions.NoInlining)]
private SequenceWaitResult InvokeWaitStrategy(long sequence)
{
return _waitStrategy.WaitFor(sequence, _dependentSequences, _cancellationTokenSource.Token);
}
}
}
26 changes: 0 additions & 26 deletions src/Disruptor.Benchmarks/MultiProducerSequencerBenchmarks.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
using BenchmarkDotNet.Attributes;
using System.Runtime.CompilerServices;
using BenchmarkDotNet.Configs;
using Disruptor.Benchmarks.Reference;

namespace Disruptor.Benchmarks;

public class MultiProducerSequencerBenchmarks
{
private readonly MultiProducerSequencer _sequencer;
private readonly MultiProducerSequencerRef2 _sequencerRef2;

public MultiProducerSequencerBenchmarks()
{
_sequencer = new MultiProducerSequencer(1024, new BusySpinWaitStrategy());
_sequencerRef2 = new MultiProducerSequencerRef2(1024, new BusySpinWaitStrategy());

Sequence = 42;
}
Expand All @@ -28,13 +24,6 @@ public void Invoke()
{
_sequencer.Publish(Sequence);
}

[Benchmark(Baseline = true)]
[MethodImpl(MethodImplOptions.NoInlining)]
public void InvokeRef2()
{
_sequencerRef2.Publish(Sequence);
}
}

public class MultiProducerSequencer_IsAvailable : MultiProducerSequencerBenchmarks
Expand All @@ -45,13 +34,6 @@ public bool Invoke()
{
return _sequencer.IsAvailable(Sequence);
}

[Benchmark(Baseline = true)]
[MethodImpl(MethodImplOptions.NoInlining)]
public bool InvokeRef2()
{
return _sequencerRef2.IsAvailable(Sequence);
}
}

public class MultiProducerSequencer_NextPublish : MultiProducerSequencerBenchmarks
Expand All @@ -63,13 +45,5 @@ public void Invoke()
var sequence = _sequencer.Next();
_sequencer.Publish(sequence);
}

[Benchmark(Baseline = true)]
[MethodImpl(MethodImplOptions.NoInlining)]
public void InvokeRef2()
{
var sequence = _sequencerRef2.Next();
_sequencerRef2.Publish(sequence);
}
}
}
10 changes: 8 additions & 2 deletions src/Disruptor.Benchmarks/Program.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Threading.Tasks;
using BenchmarkDotNet.Configs;
using BenchmarkDotNet.Running;
using Disruptor.Benchmarks.WaitStrategies;
using ObjectLayoutInspector;
Expand All @@ -8,9 +9,14 @@ namespace Disruptor.Benchmarks;

public static class Program
{
public static void Main()
public static void Main(string[] args)
{
BenchmarkSwitcher.FromAssembly(typeof(Program).Assembly).Run();
var benchmarkSwitcher = BenchmarkSwitcher.FromAssembly(typeof(Program).Assembly);
benchmarkSwitcher.Run(
config: DefaultConfig.Instance.WithOption(ConfigOptions.JoinSummary, true),
args: args
);

Console.ReadLine();
}

Expand Down
Loading

0 comments on commit e2f8392

Please sign in to comment.