Allow reactive pattern in C# with Observable pattern.
using System;
namespace RZ.Extensions{
public class DisposableProxy : IDisposable{
public static readonly IDisposable Dummy = new DisposableProxy(delegate {});
readonly Action dispose;
public DisposableProxy(Action dispose){
this.dispose = dispose;
}
public void Dispose(){
dispose();
}
}
public class ObservableProxy<T> : IObservable<T> {
readonly Func<IObserver<T>, IDisposable> subscribe;
public ObservableProxy(Func<IObserver<T>,IDisposable> subscribe){
this.subscribe = subscribe;
}
public IDisposable Subscribe(IObserver<T> observer){
return subscribe(observer);
}
}
public class ObserverProxy<T> : IObserver<T>{
readonly Action<T> next;
readonly Action<Exception> onError;
readonly Action complete;
public ObserverProxy(Action<T> next, Action<Exception> onError, Action complete){
this.next = next;
this.onError = onError;
this.complete = complete;
}
public void OnNext(T value){
next(value);
}
public void OnError(Exception error){
onError(error);
}
public void OnCompleted(){
complete();
}
}
class ObserverSubscription<T>{
public readonly IDisposable SourceSubscription;
public readonly IObserver<T>[] Observers;
public ObserverSubscription(IDisposable subscription = null, IObserver<T>[] observers = null){
Observers = observers ?? new IObserver<T>[0];
SourceSubscription = subscription ?? DisposableProxy.Dummy;
}
}
public static class ObservableExtension{
public static IObservable<T> Filter<T>(this IObservable<T> observable, Func<T, bool> predicate){
return new ObservableProxy<T>(observer =>
observable.Subscribe(new ObserverProxy<T>(
next: value =>{
if (predicate(value))
observer.OnNext(value);
},
onError: observer.OnError,
complete: observer.OnCompleted
)));
}
public static IObservable<TB> Map<TB, TA>(this IObservable<TA> observable, Func<TA, TB> mapper){
return new ObservableProxy<TB>(
observer => observable.Subscribe(new ObserverProxy<TA>(
next: value => observer.OnNext(mapper(value)),
onError: observer.OnError,
complete: observer.OnCompleted
)));
}
}
}