ZeroMQ: Loadbalancing

Date: 2021-03-22

https://zguide.zeromq.org/docs/chapter3/#The-Load-Balancing-Pattern

The Load Balancing Pattern

Now let’s look at some code. We’ll see how to connect a ROUTER socket to a REQ socket, and then to a DEALER socket. These two examples follow the same logic, which is a load balancing pattern. This pattern is our first exposure to using the ROUTER socket for deliberate routing, rather than simply acting as a reply channel.

The load balancing pattern is very common and we’ll see it several times in this book. It solves the main problem with simple round robin routing (as PUSH and DEALER offer) which is that round robin becomes inefficient if tasks do not all roughly take the same time.

It’s the post office analogy. If you have one queue per counter, and you have some people buying stamps (a fast, simple transaction), and some people opening new accounts (a very slow transaction), then you will find stamp buyers getting unfairly stuck in queues. Just as in a post office, if your messaging architecture is unfair, people will get annoyed.

The solution in the post office is to create a single queue so that even if one or two counters get stuck with slow work, other counters will continue to serve clients on a first-come, first-serve basis.

One reason PUSH and DEALER use the simplistic approach is sheer performance. If you arrive in any major US airport, you’ll find long queues of people waiting at immigration. The border patrol officials will send people in advance to queue up at each counter, rather than using a single queue. Having people walk fifty yards in advance saves a minute or two per passenger. And because every passport check takes roughly the same time, it’s more or less fair. This is the strategy for PUSH and DEALER: send work loads ahead of time so that there is less travel distance.

This is a recurring theme with ZeroMQ: the world’s problems are diverse and you can benefit from solving different problems each in the right way. The airport isn’t the post office and one size fits no one, really well.

Let’s return to the scenario of a worker (DEALER or REQ) connected to a broker (ROUTER). The broker has to know when the worker is ready, and keep a list of workers so that it can take the least recently used worker each time.

The solution is really simple, in fact: workers send a “ready” message when they start, and after they finish each task. The broker reads these messages one-by-one. Each time it reads a message, it is from the last used worker. And because we’re using a ROUTER socket, we get an identity that we can then use to send a task back to the worker.

It’s a twist on request-reply because the task is sent with the reply, and any response for the task is sent as a new request. The following code examples should make it clearer.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;

using ZeroMQ;

namespace Examples
{
	static partial class Program
	{
		static int RTReq_Workers = 10;

		public static void RTReq(string[] args)
		{
			//
			// ROUTER-to-REQ example
			//
			// While this example runs in a single process, that is only to make
			// it easier to start and stop the example. Each thread has its own
			// context and conceptually acts as a separate process.
			//
			// Author: metadings
			//

			using (var context = new ZContext())
			using (var broker = new ZSocket(context, ZSocketType.ROUTER))
			{
				broker.Bind("tcp://*:5671");

				for (int i = 0; i < RTReq_Workers; ++i)
				{
					int j = i; new Thread(() => RTReq_Worker(j)).Start();
				}

				var stopwatch = new Stopwatch();
				stopwatch.Start();

				// Run for five seconds and then tell workers to end
				int workers_fired = 0;
				while (true)
				{
					// Next message gives us least recently used worker
					using (ZMessage identity = broker.ReceiveMessage())
					{
						broker.SendMore(identity[0]);
						broker.SendMore(new ZFrame());

						// Encourage workers until it's time to fire them
						if (stopwatch.Elapsed < TimeSpan.FromSeconds(5))
						{
							broker.Send(new ZFrame("Work harder!"));
						}
						else
						{
							broker.Send(new ZFrame("Fired!"));

							if (++workers_fired == RTReq_Workers)
							{
								break;
							}
						}
					}
				}
			}
		}

		static void RTReq_Worker(int i) 
		{
			using (var context = new ZContext())
			using (var worker = new ZSocket(context, ZSocketType.REQ))
			{
				worker.IdentityString = "PEER" + i;	// Set a printable identity
				worker.Connect("tcp://127.0.0.1:5671");

				int total = 0;
				while (true)
				{
					// Tell the broker we're ready for work
					worker.Send(new ZFrame("Hi Boss"));

					// Get workload from broker, until finished
					using (ZFrame frame = worker.ReceiveFrame())
					{
						bool finished = (frame.ReadString() == "Fired!");
						if (finished)
						{
							break;
						}
					}

					total++;

					// Do some random work
					Thread.Sleep(1);
				}

				Console.WriteLine("Completed: PEER{0}, {1} tasks", i, total);
			}
		}
	}
}

TODO: Use a ConcurrentQueue like in https://solidt.eu/site/c-connectionpool/

public static void Main(string[] args)
{
    // NOTES
    // 1. Use ThreadLocal<DealerSocket> where each thread has
    //    its own client DealerSocket to talk to server
    // 2. Each thread can send using it own socket
    // 3. Each thread socket is added to poller
    const int delay = 3000; // millis
    var clientSocketPerThread = new ThreadLocal<DealerSocket>();
    using (var server = new RouterSocket("@tcp://127.0.0.1:5556"))
    using (var poller = new NetMQPoller())
    {
        // Start some threads, each with its own DealerSocket
        // to talk to the server socket. Creates lots of sockets,
        // but no nasty race conditions no shared state, each
        // thread has its own socket, happy days.
        for (int i = 0; i < 3; i++)
        {
            Task.Factory.StartNew(state =>
            {
                DealerSocket client = null;
                if (!clientSocketPerThread.IsValueCreated)
                {
                    client = new DealerSocket();
                    client.Options.Identity =
                        Encoding.Unicode.GetBytes(state.ToString());
                    client.Connect("tcp://127.0.0.1:5556");
                    client.ReceiveReady += Client_ReceiveReady;
                    clientSocketPerThread.Value = client;
                    poller.Add(client);
                }
                else
                {
                    client = clientSocketPerThread.Value;
                }
                while (true)
                {
                    var messageToServer = new NetMQMessage();
                    messageToServer.AppendEmptyFrame();
                    messageToServer.Append(state.ToString());
                    Console.WriteLine("======================================");
                    Console.WriteLine(" OUTGOING MESSAGE TO SERVER ");
                    Console.WriteLine("======================================");
                    PrintFrames("Client Sending", messageToServer);
                    client.SendMultipartMessage(messageToServer);
                    Thread.Sleep(delay);
                }
            }, string.Format("client {0}", i), TaskCreationOptions.LongRunning);
        }
        // start the poller
        poller.RunAsync();
        // server loop
        while (true)
        {
            var clientMessage = server.ReceiveMessage();
            Console.WriteLine("======================================");
            Console.WriteLine(" INCOMING CLIENT MESSAGE FROM CLIENT ");
            Console.WriteLine("======================================");
            PrintFrames("Server receiving", clientMessage);
            if (clientMessage.FrameCount == 3)
            {
                var clientAddress = clientMessage[0];
                var clientOriginalMessage = clientMessage[2].ConvertToString();
                string response = string.Format("{0} back from server {1}",
                    clientOriginalMessage, DateTime.Now.ToLongTimeString());
                var messageToClient = new NetMQMessage();
                messageToClient.Append(clientAddress);
                messageToClient.AppendEmptyFrame();
                messageToClient.Append(response);
                server.SendMultipartMessage(messageToClient);
            }
        }
    }
}
void PrintFrames(string operationType, NetMQMessage message)
{
    for (int i = 0; i < message.FrameCount; i++)
    {
        Console.WriteLine("{0} Socket : Frame[{1}] = {2}", operationType, i,
            message[i].ConvertToString());
    }
}
void Client_ReceiveReady(object sender, NetMQSocketEventArgs e)
{
    bool hasmore = false;
    e.Socket.Receive(out hasmore);
    if (hasmore)
    {
        string result = e.Socket.ReceiveFrameString(out hasmore);
        Console.WriteLine("REPLY {0}", result);
    }
}
using System;
using System.Linq;
using System.Collections.Concurrent;

public class LoadBalancer
{
    private readonly ConcurrentQueue<IWorker> Workers = new ConcurrentQueue<IWorker>();
    private readonly ConcurrentQueue<ITask> Tasks = new ConcurrentQueue<ITask>();
    public SemaphoreSlim Semaphore = new SemaphoreSlim(1);

    public void WorkerReady(IWorker worker)
    {
        Workers.Enqueue(worker);
        ProcessQueue();
    }

    public void TaskReceived(ITask task)
    {
        Tasks.Enqueue(task);
        ProcessQueue();
    }

    public void ProcessQueue()
    {
        var success = Semaphore.Wait(0);
        if (!success) return;
        try
        {
            while(Workers.Any() && Tasks.Any())
            {
                Workers.TryDequeue(out var worker);
                if (worker == null) continue;
                Tasks.TryDequeue(out var task);
                if (task == null)
                {
                    WorkerReady(worker);
                    continue;
                }
                worker.Send(task);
            }
        }
        finally
        {
            Semaphore.Release();
        }
    }
}
47450cookie-checkZeroMQ: Loadbalancing