Server Sent Events (SSE)

Date: 2021-04-14

https://www.w3schools.com/html/html5_serversentevents.asp

https://stackoverflow.com/questions/36227565/aspnet-core-server-sent-events-response-flush

https://www.smashingmagazine.com/2018/02/sse-websockets-data-flow-http2/

https://stackoverflow.com/a/58565850

//<body>
//    <script type="text/javascript">

var source = new EventSource("/sse");
source.addEventListener("message", (event) => console.log(event.data));
source.addEventListener("open", (event) => console.log("SSE open", event));
source.addEventListener("close", (event) => console.log("SSE close", event));
source.addEventListener("error", function (e) {
    if (e.readyState == EventSource.CLOSED) {
        // Connection was closed.
        console.log("SSE: connection closed");
    } else {
        console.log(e);
    }
}, false);
//    </script>
//</body>
public class Counter 
{
	public Counter(int start = 0, int resetAt = 100000) {
		counter = start;
		reset = resetAt;
	}
	private volatile int counter;
	private int reset;
	public int Current => counter;
	public int GetNext() 
	{
		counter += 1;
		counter %= reset;
		return counter;
	} 
}

[HttpGet("html")]
public ContentResult GetHtml()
{
	var html = string.Join(Environment.NewLine, new string[] {
		"<body>",
		"<script type=\"text/javascript\">",
		"var source = new EventSource(\"/WeatherForecast/stream\");",
		"source.addEventListener(\"message_a\", (event) => console.log('message_a', event.data, event.lastEventId));",
		"source.addEventListener(\"message\", (event) => console.log(event.data));",
		"source.addEventListener(\"open\", (event) => console.log(\"SSE open\", event));",
		"source.addEventListener(\"close\", (event) => console.log(\"SSE close\", event));",
		"source.addEventListener(\"error\", (event) => console.log(\"SSE error\", event));",
		"</script>",
		"</body>"
	});
	return Content(html, "text/html");
}

public static ConcurrentDictionary<int, HttpResponse> Responses = new ConcurrentDictionary<int, HttpResponse>();
public static Counter ResponseCounter = new Counter();
public static Counter EventCounter = new Counter();

public static void SendEvent() {
	if (!Responses.Any()) return;
	var rng = new Random();
	int delay = 100 * rng.Next(3, 40);
	var json = JsonConvert.SerializeObject(new { random = rng.NextDouble() * 10, delay = delay });
	var i = EventCounter.GetNext();
	var message = string.Join(Environment.NewLine, new string[] { 
		$"id: {i}",
		"event: message_a",
		$"data: {json}",
		"\n" });
	foreach(var kv in Responses) {
		Task.Run(async () => {
			try
			{
				await kv.Value.WriteAsync(message);
			} 
			catch(ObjectDisposedException ex)
			{   
				Responses.Remove(kv.Key, out var _);
				Console.WriteLine(ex.ToString()); // Should not occur by default
			}
		});
	}
}

[HttpGet("stream")]
public async Task GetStream()
{
	var response = Response;
	var key = ResponseCounter.GetNext();

	Responses.TryAdd(key, response);
	response.Headers.Add("Content-Type", "text/event-stream");

	var cancellationToken = HttpContext.RequestAborted;
	await Task.Run(() => {
		cancellationToken.WaitHandle.WaitOne();
		Responses.Remove(key, out var _);
	});
}

Middleware

// Server with middleware
app.Use(async (context, next) =>
{
    if (context.Request.Path.ToString().Equals("/sse"))
    {
        var response = context.Response;
        response.Headers.Add("Content-Type", "text/event-stream");
// Connection: keep-alive

        for(var i = 0; true; ++i)
        {
            // WriteAsync requires `using Microsoft.AspNetCore.Http`
            await response
                .WriteAsync($"data: Middleware {i} at {DateTime.Now}\r\r");

            await response.Body.FlushAsync();
            await Task.Delay(5 * 1000);
        }
    }

    await next.Invoke();
});
48830cookie-checkServer Sent Events (SSE)