public interface ISubscription : IDisposable
{
void Unsubscribe();
}
public interface IObservable<T>
{
ISubscription Subscribe(Action<T> action);
}
public interface ISubscription : IDisposable
{
void Unsubscribe();
}
public interface IObservable<T>
{
ISubscription Subscribe(Action<T> action);
}
public interface ISubscription : IDisposable { void Unsubscribe(); } public interface IObservable<T> { ISubscription Subscribe(Action<T> action); }
Implementation
internal class Subscription : ISubscription
{
private bool disposedValue;
private readonly Func<bool> UnsubscribeFn;
public Subscription(Func<bool> unsubscribe) => UnsubscribeFn = unsubscribe;
public void Unsubscribe()
{
UnsubscribeFn();
}
protected virtual void Dispose(bool disposing)
{
if (disposedValue) return;
if (disposing)
Unsubscribe();
disposedValue = true;
}
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
}
public class Observable<T> : IObservable<T>
{
private readonly ICollection<Action<T>> subscriptions = new List<Action<T>>();
public ISubscription Subscribe(Action<T> subscription)
{
subscriptions.Add(subscription);
return new Subscription(() => subscriptions.Remove(subscription));
}
public void Next(T value)
{
foreach (var subscription in subscriptions)
subscription(value); // Not exception safe, possible to wrap in Task.Run()
}
}
internal class Subscription : ISubscription
{
private bool disposedValue;
private readonly Func<bool> UnsubscribeFn;
public Subscription(Func<bool> unsubscribe) => UnsubscribeFn = unsubscribe;
public void Unsubscribe()
{
UnsubscribeFn();
}
protected virtual void Dispose(bool disposing)
{
if (disposedValue) return;
if (disposing)
Unsubscribe();
disposedValue = true;
}
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
}
public class Observable<T> : IObservable<T>
{
private readonly ICollection<Action<T>> subscriptions = new List<Action<T>>();
public ISubscription Subscribe(Action<T> subscription)
{
subscriptions.Add(subscription);
return new Subscription(() => subscriptions.Remove(subscription));
}
public void Next(T value)
{
foreach (var subscription in subscriptions)
subscription(value); // Not exception safe, possible to wrap in Task.Run()
}
}
internal class Subscription : ISubscription { private bool disposedValue; private readonly Func<bool> UnsubscribeFn; public Subscription(Func<bool> unsubscribe) => UnsubscribeFn = unsubscribe; public void Unsubscribe() { UnsubscribeFn(); } protected virtual void Dispose(bool disposing) { if (disposedValue) return; if (disposing) Unsubscribe(); disposedValue = true; } public void Dispose() { Dispose(disposing: true); GC.SuppressFinalize(this); } } public class Observable<T> : IObservable<T> { private readonly ICollection<Action<T>> subscriptions = new List<Action<T>>(); public ISubscription Subscribe(Action<T> subscription) { subscriptions.Add(subscription); return new Subscription(() => subscriptions.Remove(subscription)); } public void Next(T value) { foreach (var subscription in subscriptions) subscription(value); // Not exception safe, possible to wrap in Task.Run() } }
Usage
class Program
{
static void Main(string[] args)
{
var observable = new Observable<int>();
using (var a = observable.Subscribe((int state) => Console.WriteLine($"Observer A received: {state}")))
{
using (var b = observable.Subscribe((int state) => Console.WriteLine($"Observer B received: {state}")))
{
observable.Next(10);
observable.Next(20);
}
observable.Next(30);
}
}
}
class Program
{
static void Main(string[] args)
{
var observable = new Observable<int>();
using (var a = observable.Subscribe((int state) => Console.WriteLine($"Observer A received: {state}")))
{
using (var b = observable.Subscribe((int state) => Console.WriteLine($"Observer B received: {state}")))
{
observable.Next(10);
observable.Next(20);
}
observable.Next(30);
}
}
}
class Program { static void Main(string[] args) { var observable = new Observable<int>(); using (var a = observable.Subscribe((int state) => Console.WriteLine($"Observer A received: {state}"))) { using (var b = observable.Subscribe((int state) => Console.WriteLine($"Observer B received: {state}"))) { observable.Next(10); observable.Next(20); } observable.Next(30); } } }
469200cookie-checkC# Observable