C# Observable

Date: 2021-03-11
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
public interface ISubscription : IDisposable
{
void Unsubscribe();
}
public interface IObservable<T>
{
ISubscription Subscribe(Action<T> action);
}
public interface ISubscription : IDisposable { void Unsubscribe(); } public interface IObservable<T> { ISubscription Subscribe(Action<T> action); }
public interface ISubscription : IDisposable
{
    void Unsubscribe();
}

public interface IObservable<T>
{
    ISubscription Subscribe(Action<T> action);
}

Implementation

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
internal class Subscription : ISubscription
{
private bool disposedValue;
private readonly Func<bool> UnsubscribeFn;
public Subscription(Func<bool> unsubscribe) => UnsubscribeFn = unsubscribe;
public void Unsubscribe()
{
UnsubscribeFn();
}
protected virtual void Dispose(bool disposing)
{
if (disposedValue) return;
if (disposing)
Unsubscribe();
disposedValue = true;
}
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
}
public class Observable<T> : IObservable<T>
{
private readonly ICollection<Action<T>> subscriptions = new List<Action<T>>();
public ISubscription Subscribe(Action<T> subscription)
{
subscriptions.Add(subscription);
return new Subscription(() => subscriptions.Remove(subscription));
}
public void Next(T value)
{
foreach (var subscription in subscriptions)
subscription(value); // Not exception safe, possible to wrap in Task.Run()
}
}
internal class Subscription : ISubscription { private bool disposedValue; private readonly Func<bool> UnsubscribeFn; public Subscription(Func<bool> unsubscribe) => UnsubscribeFn = unsubscribe; public void Unsubscribe() { UnsubscribeFn(); } protected virtual void Dispose(bool disposing) { if (disposedValue) return; if (disposing) Unsubscribe(); disposedValue = true; } public void Dispose() { Dispose(disposing: true); GC.SuppressFinalize(this); } } public class Observable<T> : IObservable<T> { private readonly ICollection<Action<T>> subscriptions = new List<Action<T>>(); public ISubscription Subscribe(Action<T> subscription) { subscriptions.Add(subscription); return new Subscription(() => subscriptions.Remove(subscription)); } public void Next(T value) { foreach (var subscription in subscriptions) subscription(value); // Not exception safe, possible to wrap in Task.Run() } }
internal class Subscription : ISubscription
{
    private bool disposedValue;
    private readonly Func<bool> UnsubscribeFn;

    public Subscription(Func<bool> unsubscribe) => UnsubscribeFn = unsubscribe;

    public void Unsubscribe()
    {
        UnsubscribeFn();
    }

    protected virtual void Dispose(bool disposing)
    {
        if (disposedValue) return;
        if (disposing)
            Unsubscribe();
        disposedValue = true;
    }

    public void Dispose()
    {
        Dispose(disposing: true);
        GC.SuppressFinalize(this);
    }
}

public class Observable<T> : IObservable<T>
{
    private readonly ICollection<Action<T>> subscriptions = new List<Action<T>>();
    public ISubscription Subscribe(Action<T> subscription)
    {
        subscriptions.Add(subscription);
        return new Subscription(() => subscriptions.Remove(subscription));
    }

    public void Next(T value)
    {
        foreach (var subscription in subscriptions)
            subscription(value); // Not exception safe, possible to wrap in Task.Run()
    }
}

Usage

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
class Program
{
static void Main(string[] args)
{
var observable = new Observable<int>();
using (var a = observable.Subscribe((int state) => Console.WriteLine($"Observer A received: {state}")))
{
using (var b = observable.Subscribe((int state) => Console.WriteLine($"Observer B received: {state}")))
{
observable.Next(10);
observable.Next(20);
}
observable.Next(30);
}
}
}
class Program { static void Main(string[] args) { var observable = new Observable<int>(); using (var a = observable.Subscribe((int state) => Console.WriteLine($"Observer A received: {state}"))) { using (var b = observable.Subscribe((int state) => Console.WriteLine($"Observer B received: {state}"))) { observable.Next(10); observable.Next(20); } observable.Next(30); } } }
class Program
{
    static void Main(string[] args)
    {
        var observable = new Observable<int>();

        using (var a = observable.Subscribe((int state) => Console.WriteLine($"Observer A received: {state}")))
        {
            using (var b = observable.Subscribe((int state) => Console.WriteLine($"Observer B received: {state}")))
            {
                observable.Next(10);
                observable.Next(20);
            }
            observable.Next(30);
        }
    }
}    
46920cookie-checkC# Observable
0 of 2000 max characters.