Software Design Blog - Competing Consumers Pattern Simple solutions to solve complex problems / http://www.rssboard.org/rss-specification BlogEngine.NET 3.1.1.0 en-US /opml.axd http://www.dotnetblogengine.net/syndication.axd Jay Strydom Software Design Blog 0.000000 0.000000 Dispatching messages with the Competing Consumers Pattern <img class="img-responsive" alt="Broadcasting Messages" src="/pics/banners/CompetingConsumersBlog.jpg"> <br> <p>The <a href="/post/broadcasting-messages-using-the-composite-pattern">previous post</a> focused on fanning out the same queued message to multiple consumers.</p> <p><b>The problem</b> is that a single worker is great at processing a single message at a time but how do we increase the throughput by processing messages faster?</p> <p><b>The solution</b> is to use the Competing Consumers Pattern, which enables multiple concurrent workers to process messages received in the same queue. This approach will optimise throughput and balance the workload.</p> <p>For example, travellers often queue at the airport check-in. Checking in would be a lengthy process if there was a long queue but only one check-in operator. The solution is to employee multiple operators to service the same queue to process it faster.</p> <a class="btn btn-primary btn-sm" role="button" href="/Downloads/MessageQueueExampleCompeting.zip">Download Source Code</a> <h3>Setup</h3> <p>The core components such as the Queue Monitor were covered in the <a href="/post/message-queue-delivery-strategies">Message Queue Delivery Strategies</a> post. The queue monitor is responsible for reading messages from the queue and pushing the message content to a command to execute.</p> <p>The OrderProcessorCommand class used in the previous post was modified on line 19 below to simulate each worker taking a different duration to process a request. For example, worker 1 will take 1 second to process a request whereas worker 2 will take 2 seconds to process a request.</p> <pre class="brush: c-sharp;"> public interface ICommand&lt;in T&gt; { void Execute(T message); } public class OrderProcessorCommand : ICommand&lt;OrderModel&gt; { private readonly int _id; public OrderProcessorCommand(int id) { _id = id; } public void Execute(OrderModel message) { if (message == null) throw new ArgumentNullException("message"); var start = DateTime.Now; Thread.Sleep(TimeSpan.FromSeconds(_id)); Console.WriteLine("Processed {0} by worker {1} from {2} to {3}", message.Name, _id, start.ToString("h:mm:ss"), DateTime.Now.ToString("h:mm:ss")); } } </pre> <h3>Competing Consumers</h3> <p>The pool of available commands are orchestrated using the CompositeCompetingCommandProcessor class.</p> <p>The class will automatically block the queue monitor on line 33 from retrieving and pushing in the next message when all of the command processors are busy. The queue monitor will be unblocked as soon as the first command processor becomes available.</p> <p>The WorkingCommandModel is used to associate a command with a task.</p> <pre class="brush: c-sharp;"> public class WorkingCommandModel&lt;T&gt; { public ICommand&lt;T&gt; Command { get; set; } public Task Task { get; set; } } public class CompositeCompetingCommandProcessor&lt;T&gt; : ICommand&lt;T&gt; { private readonly IEnumerable&lt;WorkingCommandModel&lt;T&gt;&gt; _workers; public CompositeCompetingCommandProcessor(IEnumerable&lt;ICommand&lt;T&gt;&gt; commands) { if (commands == null) throw new ArgumentNullException("commands"); _workers = commands.Select( c =&gt; new WorkingCommandModel&lt;T&gt; { Command = c }).ToList(); } public void Execute(T message) { WorkingCommandModel&lt;T&gt; worker = null; while (worker == null) { worker = GetAvailableWorker(); if (worker == null) WaitForWorkerAvailability(); } worker.Task = Task.Factory.StartNew(() =&gt; worker.Command.Execute(message), TaskCreationOptions.LongRunning); // Block new messages from arriving until a worker is available if (GetAvailableWorker() == null) WaitForWorkerAvailability(); } private void WaitForWorkerAvailability() { var tasks = (from w in _workers where w.Task != null select w.Task); Task.WaitAny(tasks.ToArray()); } private WorkingCommandModel&lt;T&gt; GetAvailableWorker() { var worker = _workers.FirstOrDefault(w =&gt; w.Task == null || w.Task.IsCompleted); if (worker != null &amp;&amp; worker.Task != null) worker.Task.Dispose(); return worker; } } </pre> Let's run the solution using 3 workers to process 9 unique messages in the queue. <pre class="brush: c-sharp;"> using (var queue = new MessageQueue(QueuePath)) { queue.Formatter = new BinaryMessageFormatter(); // Write 9 messages to the queue for (var orderId = 1; orderId &lt;= 9; orderId++) { var order = new OrderModel() { Id = orderId, Name = string.Format("Order {0}", orderId) }; queue.Send(order); } // Create 3 workers var workers = new List&lt;ICommand&lt;OrderModel&gt;&gt;(); for (var workerId = 1; workerId &lt;= 3; workerId++) { workers.Add(new OrderProcessorCommand(workerId)); } // Process the queue var command = new CompositeCompetingCommandProcessor&lt;OrderModel&gt;(workers); var queueMonitor = new QueueMonitor&lt;OrderModel&gt;(queue, command); queueMonitor.Start(); } </pre> <pre>Processed Order 1 by worker 1 from 6:42:48 to 6:42:49 Processed Order 2 by worker 2 from 6:42:48 to 6:42:50 Processed Order 4 by worker 1 from 6:42:49 to 6:42:50 Processed Order 3 by worker 3 from 6:42:48 to 6:42:51 Processed Order 6 by worker 1 from 6:42:50 to 6:42:51 Processed Order 5 by worker 2 from 6:42:50 to 6:42:52 Processed Order 8 by worker 1 from 6:42:51 to 6:42:52 Processed Order 7 by worker 3 from 6:42:51 to 6:42:54 Processed Order 9 by worker 2 from 6:42:52 to 6:42:54 </pre> <p>As we can see from the results above, worker 1 processed 4 messages whereas worker 3 only processed 2 messages.</p> <div class="alert alert-info" role="alert"> <b>Warning:</b> The message order is no longer guaranteed. </div> <p>The order can sometimes be important. For example, a reservation system may processes new bookings, updates and cancellations. Let's say the traveller creates a booking and decides to cancel it immediately. When both requests are in a queue processed by multiple workers then there is a potential that the cancellation will be processed before the create.</p> <h3>Summary</h3> <p>A practical example was provided to process messages from a single queue using multiple workers by applying the competing consumers pattern.</p> <p>The drawback of potentially processing messages out of order was briefly covered. With enough interest, I may create a follow up post to address the order issue using various techniques such as a sequential message convoy, applying sessions and using correlation IDs to preserve delivery order.</p> <p>The <a href="/post/push-queue-processing-boundaries-with-3x-greater-throughput">next post</a> will provide an illustration for adding instrumentation to monitor queue processing throughput. The performance of the competing consumers solution is compared with the single worker solution.</p> /post/dispatching-messages-with-the-competing-consumers-pattern [email protected] /post/dispatching-messages-with-the-competing-consumers-pattern#comment /post.aspx?id=691362b3-5d6b-4969-8536-e6cdb44b7eb3 Thu, 24 Dec 2015 23:27:00 +1300 Competing Consumers Pattern Queues Design Patterns Queues C# .NET Jay Strydom /pingback.axd /post.aspx?id=691362b3-5d6b-4969-8536-e6cdb44b7eb3 1 /trackback.axd?id=691362b3-5d6b-4969-8536-e6cdb44b7eb3 /post/dispatching-messages-with-the-competing-consumers-pattern#comment /syndication.axd?post=691362b3-5d6b-4969-8536-e6cdb44b7eb3