masstransit3 filter with windsor scope issue
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Castle.MicroKernel.Registration;
using Castle.Windsor;
using MassTransit;
using MassTransit.Internals.Extensions;
using MassTransit.TestFramework;
using MassTransit.Pipeline;
using MassTransit.PipeConfigurators;
using MassTransit.Configurators;
using MassTransit.PipeBuilders;
using NUnit.Framework;
namespace Masstransit
{
public interface SimpleMessageInterface
{
string Name { get; }
}
public class SimpleMessageClass : SimpleMessageInterface
{
public SimpleMessageClass(string name)
{
Name = name;
}
public string Name { get; set; }
}
public static class NHibernateLazyCallContextSessionFilterExtensions
{
public static void UseFilterToTestScopeIndepenceSpecification<T>(
this IPipeConfigurator<T> configurator, IWindsorContainer container) where T : class, PipeContext
{
configurator.AddPipeSpecification(
new FilterToTestScopeIndepenceSpecification<T>(container)
);
}
}
public class FilterToTestScopeIndepence<T> : IFilter<T> where T : class, PipeContext
{
readonly IWindsorContainer _container;
public FilterToTestScopeIndepence(IWindsorContainer container)
{
_container = container;
}
void IProbeSite.Probe(ProbeContext context)
{
context.CreateFilterScope("FilterToTestScopeIndepence");
}
public async Task Send(T context, IPipe<T> next)
{
//THIS FAILS, NO SCOPE
var dependency = _container.Resolve<IScopedDependency>();
//do some work using dependency
dependency.CallInFilter("before");
await next.Send(context);
dependency.CallInFilter("after");
}
}
public class FilterToTestScopeIndepenceSpecification<T> : IPipeSpecification<T> where T : class, PipeContext
{
readonly IWindsorContainer _container;
public FilterToTestScopeIndepenceSpecification(IWindsorContainer container)
{
_container = container;
}
public IEnumerable<ValidationResult> Validate()
{
return Enumerable.Empty<ValidationResult>();
}
public void Apply(IPipeBuilder<T> builder)
{
builder.AddFilter(new FilterToTestScopeIndepence<T>(_container));
}
}
public class Using_message_scope_with_two_consumers : InMemoryTestFixture
{
[Test]
public async void Should_receive_a_message_in_scope()
{
const string name = "Joe";
await InputQueueSendEndpoint.Send(new SimpleMessageClass(name));
var result = await Registry.SecondCompleted.WithCancellation(TestCancellationToken);
Console.WriteLine(result);
}
[TearDown]
public void Close_container()
{
_container.Dispose();
}
readonly IWindsorContainer _container;
public Using_message_scope_with_two_consumers()
{
_container = new WindsorContainer();
_container.Register
(
Component.For<FirstConsumer>().LifestyleScoped(),
Component.For<SecondConsumer>().LifestyleScoped(),
Component.For<IScopedDependency>().ImplementedBy<Dependency>().LifestyleScoped()
);
}
protected override void ConfigureBus(IInMemoryBusFactoryConfigurator configurator)
{
}
protected override void ConfigureInputQueueEndpoint(IInMemoryReceiveEndpointConfigurator configurator)
{
configurator.UseFilterToTestScopeIndepenceSpecification(_container);
//configurator.UseMessageScope();
//configurator.Consumer(new WindsorConsumerFactory<FirstConsumer>(_container.Kernel));
//configurator.Consumer(new WindsorConsumerFactory<SecondConsumer>(_container.Kernel));
configurator.LoadFrom(_container);
}
}
public class FirstConsumer : IConsumer<SimpleMessageInterface>
{
readonly IScopedDependency _depedency;
public FirstConsumer(IScopedDependency depedency)
{
_depedency = depedency;
}
public async Task Consume(ConsumeContext<SimpleMessageInterface> context)
{
_depedency.CompleteFirst();
}
}
public class SecondConsumer : IConsumer<SimpleMessageInterface>
{
readonly IScopedDependency _depedency;
public SecondConsumer(IScopedDependency depedency)
{
_depedency = depedency;
}
public async Task Consume(ConsumeContext<SimpleMessageInterface> context)
{
_depedency.CompleteSecond();
}
}
public static class Registry
{
internal static TaskCompletionSource<string> _firstCompleted = new TaskCompletionSource<string>();
internal static TaskCompletionSource<string> _secondCompleted = new TaskCompletionSource<string>();
public static Task<string> FirstCompleted => _firstCompleted.Task;
public static Task<string> SecondCompleted => _secondCompleted.Task;
}
public class Dependency : IScopedDependency
{
TaskCompletionSource<string> _filter;
TaskCompletionSource<string> _first;
TaskCompletionSource<string> _second;
public Dependency()
{
_filter = new TaskCompletionSource<string>();
_first = new TaskCompletionSource<string>();
_second = new TaskCompletionSource<string>();
}
public void CallInFilter(string message)
{
_filter.TrySetResult(message);
}
public void CompleteFirst()
{
_first.TrySetResult("First");
Registry._firstCompleted.TrySetResult(_first.Task.Result);
}
public void CompleteSecond()
{
_second.TrySetResult("Second");
Registry._secondCompleted.TrySetResult(_second.Task.Result);
//if (_first.Task.Status == TaskStatus.RanToCompletion)
//_completed.TrySetResult(_first.Task.Result);
}
}
public interface IScopedDependency
{
void CallInFilter(string message);
void CompleteFirst();
void CompleteSecond();
}
}