mickdelaney
3/21/2016 - 12:51 PM

masstransit3 filter with windsor scope issue

masstransit3 filter with windsor scope issue

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Castle.MicroKernel.Registration;
using Castle.Windsor;
using MassTransit;
using MassTransit.Internals.Extensions;
using MassTransit.TestFramework;
using MassTransit.Pipeline;
using MassTransit.PipeConfigurators;
using MassTransit.Configurators;
using MassTransit.PipeBuilders;
using NUnit.Framework;

namespace Masstransit
{
    public interface SimpleMessageInterface
    {
        string Name { get; }
    }

    public class SimpleMessageClass : SimpleMessageInterface
    {
        public SimpleMessageClass(string name)
        {
            Name = name;
        }

        public string Name { get; set; }
    }
    public static class NHibernateLazyCallContextSessionFilterExtensions
    {
        public static void UseFilterToTestScopeIndepenceSpecification<T>(
            this IPipeConfigurator<T> configurator, IWindsorContainer container) where T : class, PipeContext
        {
            configurator.AddPipeSpecification(
                new FilterToTestScopeIndepenceSpecification<T>(container)
            );
        }
    }
    public class FilterToTestScopeIndepence<T> : IFilter<T> where T : class, PipeContext
    {
        readonly IWindsorContainer _container;

        public FilterToTestScopeIndepence(IWindsorContainer container)
        {
            _container = container;
        }

        void IProbeSite.Probe(ProbeContext context)
        {
            context.CreateFilterScope("FilterToTestScopeIndepence");
        }

        public async Task Send(T context, IPipe<T> next)
        {
            //THIS FAILS, NO SCOPE
            var dependency = _container.Resolve<IScopedDependency>();

            //do some work using dependency
            dependency.CallInFilter("before");

            await next.Send(context);

            dependency.CallInFilter("after");
        }

    }

    public class FilterToTestScopeIndepenceSpecification<T> : IPipeSpecification<T> where T : class, PipeContext
    {
        readonly IWindsorContainer _container;

        public FilterToTestScopeIndepenceSpecification(IWindsorContainer container)
        {
            _container = container;
        }

        public IEnumerable<ValidationResult> Validate()
        {
            return Enumerable.Empty<ValidationResult>();
        }

        public void Apply(IPipeBuilder<T> builder)
        {
            builder.AddFilter(new FilterToTestScopeIndepence<T>(_container));
        }
    }

    public class Using_message_scope_with_two_consumers : InMemoryTestFixture
    {
        [Test]
        public async void Should_receive_a_message_in_scope()
        {
            const string name = "Joe";

            await InputQueueSendEndpoint.Send(new SimpleMessageClass(name));

            var result = await Registry.SecondCompleted.WithCancellation(TestCancellationToken);

            Console.WriteLine(result);
        }

        [TearDown]
        public void Close_container()
        {
            _container.Dispose();
        }

        readonly IWindsorContainer _container;

        public Using_message_scope_with_two_consumers()
        {
            _container = new WindsorContainer();
            
            _container.Register
            (
                Component.For<FirstConsumer>().LifestyleScoped(),
                Component.For<SecondConsumer>().LifestyleScoped(),
                Component.For<IScopedDependency>().ImplementedBy<Dependency>().LifestyleScoped()
            );
        }

        protected override void ConfigureBus(IInMemoryBusFactoryConfigurator configurator)
        {
        }

        protected override void ConfigureInputQueueEndpoint(IInMemoryReceiveEndpointConfigurator configurator)
        {
            configurator.UseFilterToTestScopeIndepenceSpecification(_container);

            //configurator.UseMessageScope();
            //configurator.Consumer(new WindsorConsumerFactory<FirstConsumer>(_container.Kernel));
            //configurator.Consumer(new WindsorConsumerFactory<SecondConsumer>(_container.Kernel));

            configurator.LoadFrom(_container);
        }


    }


    public class FirstConsumer : IConsumer<SimpleMessageInterface>
    {
        readonly IScopedDependency _depedency;

        public FirstConsumer(IScopedDependency depedency)
        {
            _depedency = depedency;
        }

        public async Task Consume(ConsumeContext<SimpleMessageInterface> context)
        {
            _depedency.CompleteFirst();
        }
    }


    public class SecondConsumer : IConsumer<SimpleMessageInterface>
    {
        readonly IScopedDependency _depedency;

        public SecondConsumer(IScopedDependency depedency)
        {
            _depedency = depedency;
        }

        public async Task Consume(ConsumeContext<SimpleMessageInterface> context)
        {
            _depedency.CompleteSecond();
        }
    }

    public static class Registry
    {
        internal static TaskCompletionSource<string> _firstCompleted = new TaskCompletionSource<string>();
        internal static TaskCompletionSource<string> _secondCompleted = new TaskCompletionSource<string>();

        public static Task<string> FirstCompleted => _firstCompleted.Task;
        public static Task<string> SecondCompleted => _secondCompleted.Task;
    }

    public class Dependency : IScopedDependency
    {
        TaskCompletionSource<string> _filter;
        TaskCompletionSource<string> _first;
        TaskCompletionSource<string> _second;

        public Dependency()
        {
            _filter = new TaskCompletionSource<string>();
            _first = new TaskCompletionSource<string>();
            _second = new TaskCompletionSource<string>();
        }

        public void CallInFilter(string message)
        {
            _filter.TrySetResult(message);
        }

        public void CompleteFirst()
        {
            _first.TrySetResult("First");
            Registry._firstCompleted.TrySetResult(_first.Task.Result);
        }

        public void CompleteSecond()
        {
            _second.TrySetResult("Second");
            Registry._secondCompleted.TrySetResult(_second.Task.Result);

            //if (_first.Task.Status == TaskStatus.RanToCompletion)
            //_completed.TrySetResult(_first.Task.Result);
        }
    }


    public interface IScopedDependency
    {
        void CallInFilter(string message);
        void CompleteFirst();
        void CompleteSecond();
    }
}