Software Design Blog

Simple solutions to solve complex problems

Dispatching messages with the Competing Consumers Pattern

Broadcasting Messages

The previous post focused on fanning out the same queued message to multiple consumers.

The problem is that a single worker is great at processing a single message at a time but how do we increase the throughput by processing messages faster?

The solution is to use the Competing Consumers Pattern, which enables multiple concurrent workers to process messages received in the same queue. This approach will optimise throughput and balance the workload.

For example, travellers often queue at the airport check-in. Checking in would be a lengthy process if there was a long queue but only one check-in operator. The solution is to employee multiple operators to service the same queue to process it faster.

Download Source Code

Setup

The core components such as the Queue Monitor were covered in the Message Queue Delivery Strategies post. The queue monitor is responsible for reading messages from the queue and pushing the message content to a command to execute.

The OrderProcessorCommand class used in the previous post was modified on line 19 below to simulate each worker taking a different duration to process a request. For example, worker 1 will take 1 second to process a request whereas worker 2 will take 2 seconds to process a request.

  
    public interface ICommand<in T>
    {
        void Execute(T message);
    }

    public class OrderProcessorCommand : ICommand<OrderModel>
    {
        private readonly int _id;

        public OrderProcessorCommand(int id)
        {
            _id = id;
        }

        public void Execute(OrderModel message)
        {
            if (message == null) throw new ArgumentNullException("message");
            var start = DateTime.Now;
            Thread.Sleep(TimeSpan.FromSeconds(_id));
            Console.WriteLine("Processed {0} by worker {1} from {2} to {3}", 
                               message.Name, _id, start.ToString("h:mm:ss"),
                               DateTime.Now.ToString("h:mm:ss"));
        }
    }

Competing Consumers

The pool of available commands are orchestrated using the CompositeCompetingCommandProcessor class.

The class will automatically block the queue monitor on line 33 from retrieving and pushing in the next message when all of the command processors are busy. The queue monitor will be unblocked as soon as the first command processor becomes available.

The WorkingCommandModel is used to associate a command with a task.

  
    public class WorkingCommandModel<T>
    {
        public ICommand<T> Command { get; set; }
        public Task Task { get; set; }
    }

    public class CompositeCompetingCommandProcessor<T> : ICommand<T>
    {
        private readonly IEnumerable<WorkingCommandModel<T>> _workers;

        public CompositeCompetingCommandProcessor(IEnumerable<ICommand<T>> commands)
        {
            if (commands == null) throw new ArgumentNullException("commands");
            _workers = commands.Select(
                          c => new WorkingCommandModel<T> { Command = c }).ToList();
        }

        public void Execute(T message)
        {
            WorkingCommandModel<T> worker = null;

            while (worker == null)
            {
                worker = GetAvailableWorker();
                if (worker == null) WaitForWorkerAvailability();
            }

            worker.Task = Task.Factory.StartNew(() => 
                                         worker.Command.Execute(message), 
                                         TaskCreationOptions.LongRunning);

            // Block new messages from arriving until a worker is available
            if (GetAvailableWorker() == null) WaitForWorkerAvailability();
        }

        private void WaitForWorkerAvailability()
        {
            var tasks = (from w in _workers where w.Task != null select w.Task);
            Task.WaitAny(tasks.ToArray());
        }

        private WorkingCommandModel<T> GetAvailableWorker()
        {
            var worker = _workers.FirstOrDefault(w => 
                                       w.Task == null || w.Task.IsCompleted);
            if (worker != null && worker.Task != null) worker.Task.Dispose();
            return worker;
        }
    }
Let's run the solution using 3 workers to process 9 unique messages in the queue.
  
            using (var queue = new MessageQueue(QueuePath))
            {
                queue.Formatter = new BinaryMessageFormatter();

                // Write 9 messages to the queue
                for (var orderId = 1; orderId <= 9; orderId++)
                {
                    var order = new OrderModel()
                    {
                        Id = orderId,
                        Name = string.Format("Order {0}", orderId)
                    };
                    queue.Send(order);
                }
                
                // Create 3 workers
                var workers = new List<ICommand<OrderModel>>();
                for (var workerId = 1; workerId <= 3; workerId++)
                {
                    workers.Add(new OrderProcessorCommand(workerId));
                }

                // Process the queue
                var command = new CompositeCompetingCommandProcessor<OrderModel>(workers);
                var queueMonitor = new QueueMonitor<OrderModel>(queue, command);
                queueMonitor.Start();
            }
Processed Order 1 by worker 1 from 6:42:48 to 6:42:49
Processed Order 2 by worker 2 from 6:42:48 to 6:42:50
Processed Order 4 by worker 1 from 6:42:49 to 6:42:50
Processed Order 3 by worker 3 from 6:42:48 to 6:42:51
Processed Order 6 by worker 1 from 6:42:50 to 6:42:51
Processed Order 5 by worker 2 from 6:42:50 to 6:42:52
Processed Order 8 by worker 1 from 6:42:51 to 6:42:52
Processed Order 7 by worker 3 from 6:42:51 to 6:42:54
Processed Order 9 by worker 2 from 6:42:52 to 6:42:54

As we can see from the results above, worker 1 processed 4 messages whereas worker 3 only processed 2 messages.

The order can sometimes be important. For example, a reservation system may processes new bookings, updates and cancellations. Let's say the traveller creates a booking and decides to cancel it immediately. When both requests are in a queue processed by multiple workers then there is a potential that the cancellation will be processed before the create.

Summary

A practical example was provided to process messages from a single queue using multiple workers by applying the competing consumers pattern.

The drawback of potentially processing messages out of order was briefly covered. With enough interest, I may create a follow up post to address the order issue using various techniques such as a sequential message convoy, applying sessions and using correlation IDs to preserve delivery order.

The next post will provide an illustration for adding instrumentation to monitor queue processing throughput. The performance of the competing consumers solution is compared with the single worker solution.