public interface ISubscription : IDisposable { void Unsubscribe(); } public interface IObservable<T> { ISubscription Subscribe(Action<T> action); }
Implementation
internal class Subscription : ISubscription { private bool disposedValue; private readonly Func<bool> UnsubscribeFn; public Subscription(Func<bool> unsubscribe) => UnsubscribeFn = unsubscribe; public void Unsubscribe() { UnsubscribeFn(); } protected virtual void Dispose(bool disposing) { if (disposedValue) return; if (disposing) Unsubscribe(); disposedValue = true; } public void Dispose() { Dispose(disposing: true); GC.SuppressFinalize(this); } } public class Observable<T> : IObservable<T> { private readonly ICollection<Action<T>> subscriptions = new List<Action<T>>(); public ISubscription Subscribe(Action<T> subscription) { subscriptions.Add(subscription); return new Subscription(() => subscriptions.Remove(subscription)); } public void Next(T value) { foreach (var subscription in subscriptions) subscription(value); // Not exception safe, possible to wrap in Task.Run() } }
Usage
class Program { static void Main(string[] args) { var observable = new Observable<int>(); using (var a = observable.Subscribe((int state) => Console.WriteLine($"Observer A received: {state}"))) { using (var b = observable.Subscribe((int state) => Console.WriteLine($"Observer B received: {state}"))) { observable.Next(10); observable.Next(20); } observable.Next(30); } } }
469200cookie-checkC# Observable