C# Observable

Date: 2021-03-11
public interface ISubscription : IDisposable
{
    void Unsubscribe();
}

public interface IObservable<T>
{
    ISubscription Subscribe(Action<T> action);
}

Implementation

internal class Subscription : ISubscription
{
    private bool disposedValue;
    private readonly Func<bool> UnsubscribeFn;

    public Subscription(Func<bool> unsubscribe) => UnsubscribeFn = unsubscribe;

    public void Unsubscribe()
    {
        UnsubscribeFn();
    }

    protected virtual void Dispose(bool disposing)
    {
        if (disposedValue) return;
        if (disposing)
            Unsubscribe();
        disposedValue = true;
    }

    public void Dispose()
    {
        Dispose(disposing: true);
        GC.SuppressFinalize(this);
    }
}

public class Observable<T> : IObservable<T>
{
    private readonly ICollection<Action<T>> subscriptions = new List<Action<T>>();
    public ISubscription Subscribe(Action<T> subscription)
    {
        subscriptions.Add(subscription);
        return new Subscription(() => subscriptions.Remove(subscription));
    }

    public void Next(T value)
    {
        foreach (var subscription in subscriptions)
            subscription(value); // Not exception safe, possible to wrap in Task.Run()
    }
}

Usage

class Program
{
    static void Main(string[] args)
    {
        var observable = new Observable<int>();

        using (var a = observable.Subscribe((int state) => Console.WriteLine($"Observer A received: {state}")))
        {
            using (var b = observable.Subscribe((int state) => Console.WriteLine($"Observer B received: {state}")))
            {
                observable.Next(10);
                observable.Next(20);
            }
            observable.Next(30);
        }
    }
}    
46920cookie-checkC# Observable