{"id":4745,"date":"2021-03-22T11:57:51","date_gmt":"2021-03-22T10:57:51","guid":{"rendered":"https:\/\/solidt.eu\/site\/?p=4745"},"modified":"2021-03-22T22:39:00","modified_gmt":"2021-03-22T21:39:00","slug":"zeromq-loadbalancing","status":"publish","type":"post","link":"https:\/\/solidt.eu\/site\/zeromq-loadbalancing\/","title":{"rendered":"ZeroMQ: Loadbalancing"},"content":{"rendered":"\n<p><a href=\"https:\/\/zguide.zeromq.org\/docs\/chapter3\/#The-Load-Balancing-Pattern\">https:\/\/zguide.zeromq.org\/docs\/chapter3\/#The-Load-Balancing-Pattern<\/a><\/p>\n\n\n\n<h2 class=\"wp-block-heading\" id=\"The-Load-Balancing-Pattern\">The Load Balancing Pattern<\/h2>\n\n\n\n<p>Now let\u2019s look at some code. We\u2019ll see how to connect a ROUTER socket to a REQ socket, and then to a DEALER socket. These two examples follow the same logic, which is a <em>load balancing<\/em> pattern. This pattern is our first exposure to using the ROUTER socket for deliberate routing, rather than simply acting as a reply channel.<\/p>\n\n\n\n<p>The load balancing pattern is very common and we\u2019ll see it several times in this book. It solves the main problem with simple round robin routing (as PUSH and DEALER offer) which is that round robin becomes inefficient if tasks do not all roughly take the same time.<\/p>\n\n\n\n<p>It\u2019s the post office analogy. If you have one queue per counter, and you have some people buying stamps (a fast, simple transaction), and some people opening new accounts (a very slow transaction), then you will find stamp buyers getting unfairly stuck in queues. Just as in a post office, if your messaging architecture is unfair, people will get annoyed.<\/p>\n\n\n\n<p>The solution in the post office is to create a single queue so that even if one or two counters get stuck with slow work, other counters will continue to serve clients on a first-come, first-serve basis.<\/p>\n\n\n\n<p>One reason PUSH and DEALER use the simplistic approach is sheer performance. If you arrive in any major US airport, you\u2019ll find long queues of people waiting at immigration. The border patrol officials will send people in advance to queue up at each counter, rather than using a single queue. Having people walk fifty yards in advance saves a minute or two per passenger. And because every passport check takes roughly the same time, it\u2019s more or less fair. This is the strategy for PUSH and DEALER: send work loads ahead of time so that there is less travel distance.<\/p>\n\n\n\n<p>This is a recurring theme with ZeroMQ: the world\u2019s problems are diverse and you can benefit from solving different problems each in the right way. The airport isn\u2019t the post office and one size fits no one, really well.<\/p>\n\n\n\n<p>Let\u2019s return to the scenario of a worker (DEALER or REQ) connected to a broker (ROUTER). The broker has to know when the worker is ready, and keep a list of workers so that it can take the <em>least recently used<\/em> worker each time.<\/p>\n\n\n\n<p><span style=\"text-decoration: underline;\">The solution is really simple, in fact: workers send a \u201cready\u201d message when they start, and after they finish each task. The broker reads these messages one-by-one. Each time it reads a message, it is from the last used worker. And because we\u2019re using a ROUTER socket, we get an identity that we can then use to send a task back to the worker.<\/span><\/p>\n\n\n\n<p>It\u2019s a twist on request-reply because the task is sent with the reply, and any response for the task is sent as a new request. The following code examples should make it clearer.<\/p>\n\n\n\n<pre class=\"EnlighterJSRAW\" data-enlighter-language=\"csharp\" data-enlighter-theme=\"\" data-enlighter-highlight=\"\" data-enlighter-linenumbers=\"\" data-enlighter-lineoffset=\"\" data-enlighter-title=\"\" data-enlighter-group=\"\">\ufeffusing System;\nusing System.Collections.Generic;\nusing System.Diagnostics;\nusing System.Linq;\nusing System.Text;\nusing System.Threading;\n\nusing ZeroMQ;\n\nnamespace Examples\n{\n\tstatic partial class Program\n\t{\n\t\tstatic int RTReq_Workers = 10;\n\n\t\tpublic static void RTReq(string[] args)\n\t\t{\n\t\t\t\/\/\n\t\t\t\/\/ ROUTER-to-REQ example\n\t\t\t\/\/\n\t\t\t\/\/ While this example runs in a single process, that is only to make\n\t\t\t\/\/ it easier to start and stop the example. Each thread has its own\n\t\t\t\/\/ context and conceptually acts as a separate process.\n\t\t\t\/\/\n\t\t\t\/\/ Author: metadings\n\t\t\t\/\/\n\n\t\t\tusing (var context = new ZContext())\n\t\t\tusing (var broker = new ZSocket(context, ZSocketType.ROUTER))\n\t\t\t{\n\t\t\t\tbroker.Bind(\"tcp:\/\/*:5671\");\n\n\t\t\t\tfor (int i = 0; i &lt; RTReq_Workers; ++i)\n\t\t\t\t{\n\t\t\t\t\tint j = i; new Thread(() => RTReq_Worker(j)).Start();\n\t\t\t\t}\n\n\t\t\t\tvar stopwatch = new Stopwatch();\n\t\t\t\tstopwatch.Start();\n\n\t\t\t\t\/\/ Run for five seconds and then tell workers to end\n\t\t\t\tint workers_fired = 0;\n\t\t\t\twhile (true)\n\t\t\t\t{\n\t\t\t\t\t\/\/ Next message gives us least recently used worker\n\t\t\t\t\tusing (ZMessage identity = broker.ReceiveMessage())\n\t\t\t\t\t{\n\t\t\t\t\t\tbroker.SendMore(identity[0]);\n\t\t\t\t\t\tbroker.SendMore(new ZFrame());\n\n\t\t\t\t\t\t\/\/ Encourage workers until it's time to fire them\n\t\t\t\t\t\tif (stopwatch.Elapsed &lt; TimeSpan.FromSeconds(5))\n\t\t\t\t\t\t{\n\t\t\t\t\t\t\tbroker.Send(new ZFrame(\"Work harder!\"));\n\t\t\t\t\t\t}\n\t\t\t\t\t\telse\n\t\t\t\t\t\t{\n\t\t\t\t\t\t\tbroker.Send(new ZFrame(\"Fired!\"));\n\n\t\t\t\t\t\t\tif (++workers_fired == RTReq_Workers)\n\t\t\t\t\t\t\t{\n\t\t\t\t\t\t\t\tbreak;\n\t\t\t\t\t\t\t}\n\t\t\t\t\t\t}\n\t\t\t\t\t}\n\t\t\t\t}\n\t\t\t}\n\t\t}\n\n\t\tstatic void RTReq_Worker(int i) \n\t\t{\n\t\t\tusing (var context = new ZContext())\n\t\t\tusing (var worker = new ZSocket(context, ZSocketType.REQ))\n\t\t\t{\n\t\t\t\tworker.IdentityString = \"PEER\" + i;\t\/\/ Set a printable identity\n\t\t\t\tworker.Connect(\"tcp:\/\/127.0.0.1:5671\");\n\n\t\t\t\tint total = 0;\n\t\t\t\twhile (true)\n\t\t\t\t{\n\t\t\t\t\t\/\/ Tell the broker we're ready for work\n\t\t\t\t\tworker.Send(new ZFrame(\"Hi Boss\"));\n\n\t\t\t\t\t\/\/ Get workload from broker, until finished\n\t\t\t\t\tusing (ZFrame frame = worker.ReceiveFrame())\n\t\t\t\t\t{\n\t\t\t\t\t\tbool finished = (frame.ReadString() == \"Fired!\");\n\t\t\t\t\t\tif (finished)\n\t\t\t\t\t\t{\n\t\t\t\t\t\t\tbreak;\n\t\t\t\t\t\t}\n\t\t\t\t\t}\n\n\t\t\t\t\ttotal++;\n\n\t\t\t\t\t\/\/ Do some random work\n\t\t\t\t\tThread.Sleep(1);\n\t\t\t\t}\n\n\t\t\t\tConsole.WriteLine(\"Completed: PEER{0}, {1} tasks\", i, total);\n\t\t\t}\n\t\t}\n\t}\n}<\/pre>\n\n\n\n<p><strong>TODO:<\/strong> Use a ConcurrentQueue like in <a href=\"https:\/\/solidt.eu\/site\/c-connectionpool\/\">https:\/\/solidt.eu\/site\/c-connectionpool\/<\/a><\/p>\n\n\n\n<figure class=\"wp-block-image size-large\"><img loading=\"lazy\" decoding=\"async\" width=\"726\" height=\"344\" src=\"https:\/\/solidt.eu\/site\/wp-content\/uploads\/2021\/03\/afbeelding-1.png\" alt=\"\" class=\"wp-image-4748\" srcset=\"https:\/\/solidt.eu\/site\/wp-content\/uploads\/2021\/03\/afbeelding-1.png 726w, https:\/\/solidt.eu\/site\/wp-content\/uploads\/2021\/03\/afbeelding-1-300x142.png 300w\" sizes=\"auto, (max-width: 726px) 100vw, 726px\" \/><\/figure>\n\n\n\n<pre class=\"EnlighterJSRAW\" data-enlighter-language=\"csharp\" data-enlighter-theme=\"\" data-enlighter-highlight=\"\" data-enlighter-linenumbers=\"\" data-enlighter-lineoffset=\"\" data-enlighter-title=\"\" data-enlighter-group=\"\">public static void Main(string[] args)\n{\n    \/\/ NOTES\n    \/\/ 1. Use ThreadLocal&lt;DealerSocket> where each thread has\n    \/\/    its own client DealerSocket to talk to server\n    \/\/ 2. Each thread can send using it own socket\n    \/\/ 3. Each thread socket is added to poller\n    const int delay = 3000; \/\/ millis\n    var clientSocketPerThread = new ThreadLocal&lt;DealerSocket>();\n    using (var server = new RouterSocket(\"@tcp:\/\/127.0.0.1:5556\"))\n    using (var poller = new NetMQPoller())\n    {\n        \/\/ Start some threads, each with its own DealerSocket\n        \/\/ to talk to the server socket. Creates lots of sockets,\n        \/\/ but no nasty race conditions no shared state, each\n        \/\/ thread has its own socket, happy days.\n        for (int i = 0; i &lt; 3; i++)\n        {\n            Task.Factory.StartNew(state =>\n            {\n                DealerSocket client = null;\n                if (!clientSocketPerThread.IsValueCreated)\n                {\n                    client = new DealerSocket();\n                    client.Options.Identity =\n                        Encoding.Unicode.GetBytes(state.ToString());\n                    client.Connect(\"tcp:\/\/127.0.0.1:5556\");\n                    client.ReceiveReady += Client_ReceiveReady;\n                    clientSocketPerThread.Value = client;\n                    poller.Add(client);\n                }\n                else\n                {\n                    client = clientSocketPerThread.Value;\n                }\n                while (true)\n                {\n                    var messageToServer = new NetMQMessage();\n                    messageToServer.AppendEmptyFrame();\n                    messageToServer.Append(state.ToString());\n                    Console.WriteLine(\"======================================\");\n                    Console.WriteLine(\" OUTGOING MESSAGE TO SERVER \");\n                    Console.WriteLine(\"======================================\");\n                    PrintFrames(\"Client Sending\", messageToServer);\n                    client.SendMultipartMessage(messageToServer);\n                    Thread.Sleep(delay);\n                }\n            }, string.Format(\"client {0}\", i), TaskCreationOptions.LongRunning);\n        }\n        \/\/ start the poller\n        poller.RunAsync();\n        \/\/ server loop\n        while (true)\n        {\n            var clientMessage = server.ReceiveMessage();\n            Console.WriteLine(\"======================================\");\n            Console.WriteLine(\" INCOMING CLIENT MESSAGE FROM CLIENT \");\n            Console.WriteLine(\"======================================\");\n            PrintFrames(\"Server receiving\", clientMessage);\n            if (clientMessage.FrameCount == 3)\n            {\n                var clientAddress = clientMessage[0];\n                var clientOriginalMessage = clientMessage[2].ConvertToString();\n                string response = string.Format(\"{0} back from server {1}\",\n                    clientOriginalMessage, DateTime.Now.ToLongTimeString());\n                var messageToClient = new NetMQMessage();\n                messageToClient.Append(clientAddress);\n                messageToClient.AppendEmptyFrame();\n                messageToClient.Append(response);\n                server.SendMultipartMessage(messageToClient);\n            }\n        }\n    }\n}\nvoid PrintFrames(string operationType, NetMQMessage message)\n{\n    for (int i = 0; i &lt; message.FrameCount; i++)\n    {\n        Console.WriteLine(\"{0} Socket : Frame[{1}] = {2}\", operationType, i,\n            message[i].ConvertToString());\n    }\n}\nvoid Client_ReceiveReady(object sender, NetMQSocketEventArgs e)\n{\n    bool hasmore = false;\n    e.Socket.Receive(out hasmore);\n    if (hasmore)\n    {\n        string result = e.Socket.ReceiveFrameString(out hasmore);\n        Console.WriteLine(\"REPLY {0}\", result);\n    }\n}<\/pre>\n\n\n\n<pre class=\"EnlighterJSRAW\" data-enlighter-language=\"csharp\" data-enlighter-theme=\"\" data-enlighter-highlight=\"\" data-enlighter-linenumbers=\"\" data-enlighter-lineoffset=\"\" data-enlighter-title=\"\" data-enlighter-group=\"\">using System;\nusing System.Linq;\nusing System.Collections.Concurrent;\n\npublic class LoadBalancer\n{\n    private readonly ConcurrentQueue&lt;IWorker> Workers = new ConcurrentQueue&lt;IWorker>();\n    private readonly ConcurrentQueue&lt;ITask> Tasks = new ConcurrentQueue&lt;ITask>();\n    public SemaphoreSlim Semaphore = new SemaphoreSlim(1);\n\n    public void WorkerReady(IWorker worker)\n    {\n        Workers.Enqueue(worker);\n        ProcessQueue();\n    }\n\n    public void TaskReceived(ITask task)\n    {\n        Tasks.Enqueue(task);\n        ProcessQueue();\n    }\n\n    public void ProcessQueue()\n    {\n        var success = Semaphore.Wait(0);\n        if (!success) return;\n        try\n        {\n            while(Workers.Any() &amp;&amp; Tasks.Any())\n            {\n                Workers.TryDequeue(out var worker);\n                if (worker == null) continue;\n                Tasks.TryDequeue(out var task);\n                if (task == null)\n                {\n                    WorkerReady(worker);\n                    continue;\n                }\n                worker.Send(task);\n            }\n        }\n        finally\n        {\n            Semaphore.Release();\n        }\n    }\n}\n<\/pre>\n","protected":false},"excerpt":{"rendered":"<p>https:\/\/zguide.zeromq.org\/docs\/chapter3\/#The-Load-Balancing-Pattern The Load Balancing Pattern Now let\u2019s look at some code. We\u2019ll see how to connect a ROUTER socket to a REQ socket, and then to a DEALER socket. These two examples follow the same logic, which is a load balancing pattern. This pattern is our first exposure to using the ROUTER socket for deliberate [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"inline_featured_image":false,"footnotes":""},"categories":[1],"tags":[],"class_list":["post-4745","post","type-post","status-publish","format-standard","hentry","category-uncategorized"],"_links":{"self":[{"href":"https:\/\/solidt.eu\/site\/wp-json\/wp\/v2\/posts\/4745","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/solidt.eu\/site\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/solidt.eu\/site\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/solidt.eu\/site\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/solidt.eu\/site\/wp-json\/wp\/v2\/comments?post=4745"}],"version-history":[{"count":6,"href":"https:\/\/solidt.eu\/site\/wp-json\/wp\/v2\/posts\/4745\/revisions"}],"predecessor-version":[{"id":4766,"href":"https:\/\/solidt.eu\/site\/wp-json\/wp\/v2\/posts\/4745\/revisions\/4766"}],"wp:attachment":[{"href":"https:\/\/solidt.eu\/site\/wp-json\/wp\/v2\/media?parent=4745"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/solidt.eu\/site\/wp-json\/wp\/v2\/categories?post=4745"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/solidt.eu\/site\/wp-json\/wp\/v2\/tags?post=4745"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}