ruxo
8/9/2015 - 4:29 AM

Allow reactive pattern in C# with Observable pattern.

Allow reactive pattern in C# with Observable pattern.

using System;

namespace RZ.Extensions{
    public class DisposableProxy : IDisposable{
        public static readonly IDisposable Dummy = new DisposableProxy(delegate {});
        readonly Action dispose;
        public DisposableProxy(Action dispose){
            this.dispose = dispose;
        }
        public void Dispose(){
            dispose();
        }
    }
    public class ObservableProxy<T> : IObservable<T> {
        readonly Func<IObserver<T>, IDisposable> subscribe;
        public ObservableProxy(Func<IObserver<T>,IDisposable> subscribe){
            this.subscribe = subscribe;
        }
        public IDisposable Subscribe(IObserver<T> observer){
            return subscribe(observer);
        }
    }
    public class ObserverProxy<T> : IObserver<T>{
        readonly Action<T> next;
        readonly Action<Exception> onError;
        readonly Action complete;
        public ObserverProxy(Action<T> next, Action<Exception> onError, Action complete){
            this.next = next;
            this.onError = onError;
            this.complete = complete;
        }
        public void OnNext(T value){
            next(value);
        }
        public void OnError(Exception error){
            onError(error);
        }
        public void OnCompleted(){
            complete();
        }
    }
    class ObserverSubscription<T>{
        public readonly IDisposable SourceSubscription;
        public readonly IObserver<T>[] Observers;
        public ObserverSubscription(IDisposable subscription = null, IObserver<T>[] observers = null){
            Observers = observers ?? new IObserver<T>[0];
            SourceSubscription = subscription ?? DisposableProxy.Dummy;
        }
    }
    public static class ObservableExtension{
        public static IObservable<T> Filter<T>(this IObservable<T> observable, Func<T, bool> predicate){
            return new ObservableProxy<T>(observer =>
            observable.Subscribe(new ObserverProxy<T>(
                next: value =>{
                    if (predicate(value))
                        observer.OnNext(value);
                },
                onError: observer.OnError,
                complete: observer.OnCompleted
                )));
        }
        public static IObservable<TB> Map<TB, TA>(this IObservable<TA> observable, Func<TA, TB> mapper){
            return new ObservableProxy<TB>(
                observer => observable.Subscribe(new ObserverProxy<TA>(
                    next: value => observer.OnNext(mapper(value)),
                    onError: observer.OnError,
                    complete: observer.OnCompleted
                    )));            
        } 
    }
}