stephensmitchell
4/17/2018 - 4:54 PM

Partitioning events with Akka.net (linqpad - reference akka.net and Rx-main)

Partitioning events with Akka.net (linqpad - reference akka.net and Rx-main)

void Main()
{
	var system = ActorSystem.Create("MySystem");
	var greeters = system.ActorOf(Props.Create(() => new DeviceSplitter()));
	var r = new System.Random();
	var measures = Enumerable.Repeat(new [] {"a","b","c","d"}, 250000)
		.SelectMany(c => c)
		.Select (id => new Measure(id,r.Next()));
	foreach (var m in measures) greeters.Tell(m);
	Console.ReadLine();
}

public class Measure
{
    public Measure(string deviceId, double value)
    {
        DeviceId = deviceId;
		Value = value;
    }
    public string DeviceId { get;private set; }
    public double Value { get;private set; }
}

public class Dump {}

public class DeviceSplitter : ReceiveActor
{	
	public DeviceSplitter()
    {		
		Dictionary<string,IActorRef> knownDevices = new Dictionary<string,IActorRef>();
		Receive<Measure>(m => {
			if (!knownDevices.ContainsKey(m.DeviceId)) knownDevices.Add(m.DeviceId, Context.ActorOf(Props.Create(() => new DeviceActor())));
			knownDevices[m.DeviceId].Tell(m);
		});
    }
}

public class DeviceActor : ReceiveActor
{
	public DeviceActor() {
		var measures = new List<Measure>();
		var subject = new ReplaySubject<Measure>();
		Receive<Measure>(m => { 
			subject.OnNext(m);
		});
		subject.Buffer(5000).Subscribe(m => 
			Console.WriteLine("Average over last {0} from {1} = {2}",m.Count(),m.First().DeviceId,m.Average (x => x.Value)));
	}
}