mickdelaney
6/19/2013 - 3:46 PM

ComplexEventProcessing_Specs.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace MassTransit.AutomatonymousTests
{
    using System.Threading;
    using Automatonymous;
    using Magnum.Extensions;
    using NUnit.Framework;
    using Saga;
    using SubscriptionConfigurators;


    [TestFixture]
    public class ComplexEventProcessing_Specs : MassTransitTestFixture
    {
        #region Fields

        TestStateMachine _machine;
        InMemorySagaRepository<Instance> _repository;

        #endregion

        #region Setup

        protected override void ConfigureSubscriptions(SubscriptionBusServiceConfigurator configurator)
        {
            _machine = new TestStateMachine();
            _repository = new InMemorySagaRepository<Instance>();

            configurator.StateMachineSaga(_machine, _repository, x =>
            {
                x.Correlate(_machine.Start, (i, e) => i.PartitionName == e.PartitionName);
                x.Correlate(_machine.Info, (i, e) => i.PartitionName == e.PartitionName);
                x.Correlate(_machine.Boost, (i, e) => true);
            });
        }

        class Instance :
            SagaStateMachineInstance
        {
            public Instance(Guid correlationId)
            {
                CorrelationId = correlationId;
            }

            protected Instance()
            {
            }

            public State CurrentState { get; set; }
            public Guid CorrelationId { get; set; }
            public IServiceBus Bus { get; set; }

            public string PartitionName { get; set; }
            public int ValueSum { get; set; }
        }


        class TestStateMachine :
            AutomatonymousStateMachine<Instance>
        {
            public TestStateMachine()
            {
                InstanceState(x => x.CurrentState);

                State(() => Running);
                Event(() => Start);
                Event(() => Info);
                Event(() => Boost);

                Initially(
                    When(Start)
                        .Then(HandleStart)
                        .TransitionTo(Running)
                );

                During(Running,
                    When(Info)
                        .Then(HandleInfoEvent)
                    ,
                    When(Boost)
                        .Then(HandleBoost)
                );
            }

            private void HandleStart(Instance inst, StartPartition evt)
            {
                inst.PartitionName = evt.PartitionName;
            }

            private void HandleInfoEvent(Instance inst, InfoEvent evt)
            {
                inst.ValueSum += evt.Value;
            }

            private void HandleBoost(Instance inst, BoostEvent evt)
            {
                inst.ValueSum += evt.Value;
            }

            public State Running { get; private set; }

            public Event<StartPartition> Start { get; private set; }
            public Event<InfoEvent> Info { get; private set; }
            public Event<BoostEvent> Boost { get; private set; }
        }

        class StartPartition
        {
            public StartPartition(string partitionName)
            {
                CorrelationId = Guid.NewGuid();
                PartitionName = partitionName;
            }

            public Guid CorrelationId { get; private set; }
            public string PartitionName { get; set; }
        }

        class InfoEvent
        {
            public InfoEvent(string partitionName, int value = 1)
            {
                PartitionName = partitionName;
                Value = value;
            }

            public string PartitionName { get; set; }
            public int Value { get; set; }
        }

        class BoostEvent
        {
            public BoostEvent(int value = 100)
            {
                Value = value;
            }

            public int Value { get; set; }
        }

        #endregion

        [Test]
        public void Should_sum_values_in_partitions()
        {
            Bus.Publish(new StartPartition("A"));
            Bus.Publish(new StartPartition("B"));

            Bus.Publish(new InfoEvent("A", value: 1));
            Bus.Publish(new InfoEvent("B", value: 1));
            Bus.Publish(new InfoEvent("A", value: 10));
            Bus.Publish(new InfoEvent("B", value: 20));

            Bus.Publish(new BoostEvent());

            Thread.Sleep(8.Seconds());

            var sagas = _repository.Where(new SagaFilter<Instance>(instance => true))
                                   .ToArray();
            
            Assert.AreEqual(2, sagas.Length);
            Assert.AreEqual(111, sagas.Single(s => s.PartitionName == "A").ValueSum);
            Assert.AreEqual(121, sagas.Single(s => s.PartitionName == "B").ValueSum);

        }
    }
}