Software Design Blog

Simple solutions to solve complex problems

Broadcasting messages using the composite pattern

Broadcasting Messages

The previous post focused on reading messages from a queue and pushing the messages to a single command for processing.

The problem is how can we notify multiple commands to process the same request? For example, when an order is received, we may want a command to 1) send an email, 2) add an audit record and 3) call an external API.

The solution is to use the composite pattern, whereby a group of commands are treated in a similar way as a single command to fan out the messages.

Download Source Code

Setup

The core components such as the Queue Monitor were covered in the Message Queue Delivery Strategies post.

Sequential Processing

The CompositeSequentialBroadcastCommand class below implements ICommand to indicate that it exhibits the behaviour of a command. The responsibility of the composite class is to loop though a collection of commands and call each command with the same request.

  
    public interface ICommand<in T>
    {
        void Execute(T message);
    }

    public class OrderProcessorCommand : ICommand<OrderModel>
    {
        private readonly int _id;

        public OrderProcessorCommand(int id)
        {
            _id = id;
        }

        public void Execute(OrderModel message)
        {
            if (message == null) throw new ArgumentNullException("message");
            var start = DateTime.Now;
            Thread.Sleep(TimeSpan.FromSeconds(2));
            Console.WriteLine("Processed {0} by worker {1} from {2} to {3}", 
                               message.Name, _id, start.ToString("h:mm:ss"), 
                               DateTime.Now.ToString("h:mm:ss"));
        }
    }

    public class CompositeSequentialBroadcastCommand<T> : ICommand<T>
    {
        private readonly IEnumerable<ICommand<T>> _commands;

        public CompositeSequentialBroadcastCommand(IEnumerable<ICommand<T>> commands)
        {
            if (commands == null) throw new ArgumentNullException("commands");
            _commands = commands.ToList();
        }

        public void Execute(T message)
        {
            foreach (var command in _commands)
            {
                command.Execute(message);
            }
        }
    }
Let's see what happens when 3 sequential workers are configured to process 2 messages.
  
            using (var queue = new MessageQueue(@".\Private$\Orders"))
            {
                queue.Formatter = new BinaryMessageFormatter();

                var workers = new List<ICommand<OrderModel>>();
                for (var workerId = 1; workerId <= 3; workerId++)
                {
                    workers.Add(new OrderProcessorCommand(workerId));
                }
                
                var command = new CompositeSequentialBroadcastCommand<OrderModel>(workers);
                var queueMonitor = new QueueMonitor<OrderModel>(queue, command);
                queueMonitor.Start();
            }
Processed Order 1 by worker 1 from 6:26:34 to 6:26:36
Processed Order 1 by worker 2 from 6:26:36 to 6:26:38
Processed Order 1 by worker 3 from 6:26:38 to 6:26:40
Processed Order 2 by worker 1 from 6:26:40 to 6:26:42
Processed Order 2 by worker 2 from 6:26:42 to 6:26:44
Processed Order 2 by worker 3 from 6:26:44 to 6:26:46

The output above shows that each worker processed the same message in sequence. If worker 2 failed then worker 3 would not be executed. This can be a great pattern when the sequence is important. For example, only send the welcome email if the order has been created successfully.

Parallel Processing

The CompositeParallelBroadcastCommand class is similar to the previous example. The only difference is that this class will call the commands in parallel.

  
    class CompositeParallelBroadcastCommand<T> : ICommand<T>
    {
        private readonly IEnumerable<ICommand<T>> _commands;

        public CompositeParallelBroadcastCommand(IEnumerable<ICommand<T>> commands)
        {
            if (commands == null) throw new ArgumentNullException("commands");
            _commands = commands.ToList();
        }

        public void Execute(T message)
        {
            Parallel.ForEach(_commands, c => c.Execute(message));
        }
    }
Let's see what happens when parallel 3 workers are configured to process 2 messages.
  
            using (var queue = new MessageQueue(@".\Private$\Orders"))
            {
                queue.Formatter = new BinaryMessageFormatter();

                var workers = new List<ICommand<OrderModel>>();
                for (var workerId = 1; workerId <= 3; workerId++)
                {
                    workers.Add(new OrderProcessorCommand(workerId));
                }
                
                var command = new CompositeParallelBroadcastCommand<OrderModel>(workers);
                var queueMonitor = new QueueMonitor<OrderModel>(queue, command);
                queueMonitor.Start();
            }
Processed Order 1 by worker 2 from 6:08:36 to 6:08:38
Processed Order 1 by worker 1 from 6:08:36 to 6:08:38
Processed Order 1 by worker 3 from 6:08:36 to 6:08:38
Processed Order 2 by worker 3 from 6:08:38 to 6:08:40
Processed Order 2 by worker 2 from 6:08:38 to 6:08:40
Processed Order 2 by worker 1 from 6:08:38 to 6:08:40

The output above shows that each worker processed the same message concurrently. This can be a great pattern when sequencing doesn’t matters. For example, send a welcome email and add an audit record at the same time.

Summary

This post focused on broadcasting a single message to multiple receivers. The composite pattern allows the behaviour of the Queue Monitor push to change without modifying the code.

Coming soon: The next post will focus on processing a single request by dispatching messaging to a pool of competing consumer processors.

Message Queue Delivery Strategies

Message Queue Delivery Strategies

The previous post focused on MSMQ fundamentals using a pull approach to retrieve messages from a queue.

This post is the start of a series that covers multiple strategies to push queued messages to clients. The intention of the push approach is to keep clients agnostic of being part of a message based architecture.

MSMQ technology is used but it is easy enough to change the implementation to use an alternative queuing solution such as Azure Service Bus.

Download Source Code

Setup

Setting up an MSQM was covered in the MSMQ fundamentals post.

The following code was used for placing 3 unique OrderModel messages in the queue.

  
    [Serializable]
    public class OrderModel
    {
        public int Id { get; set; }
        public string Name { get; set; }
    }

    using (var queue = new MessageQueue(@".\Private$\Orders"))
    {
        queue.Formatter = new BinaryMessageFormatter();

        for (var orderId = 1; orderId <= 3; orderId++)
        {
            var order = new OrderModel()
            {
                Id = orderId,
                Name = string.Format("Order {0}", orderId)
            };
            queue.Send(order);
        }
    }

Command Pattern

The problem is how can a message infrastructure issue requests to objects without knowing anything about the operation being requested or the receiver of the request?

The solution is to use the command pattern to decouple senders from receivers. A command decouples the object that invokes the operation from the object that knows how to perform the operation.

The generic command interface is displayed below. The OrderProcessorCommand class implements the command interface to indicate that it can accept OrderModel messages, which it will use to simulate an order being processed.

  
    public interface ICommand<in T>
    {
        void Execute(T message);
    }

    public class OrderProcessorCommand : ICommand<OrderModel>
    {
        private readonly int _id;

        public OrderProcessorCommand(int id)
        {
            _id = id;
        }

        public void Execute(OrderModel message)
        {
            if (message == null) throw new ArgumentNullException("message");
            var start = DateTime.Now;

            // Simulate work being performed
            Thread.Sleep(TimeSpan.FromSeconds(2));

            Console.WriteLine("Processed {0} by worker {1} from {2} to {3}", 
                 message.Name, _id, start.ToString("h:mm:ss"), 
                 DateTime.Now.ToString("h:mm:ss"));
        }
    }
Note that a sleep was added on line 21 to simulate work being performed by the processor.

Queue Monitor

The queue monitor class acts as orchestrator, which is responsible for listening to the queue for incoming messages and calling a command to execute each message.

When the client calls the start method then the workflow outlined below will run consciously until the client calls the stop method:

  1. The BeginReceive method will kick off the queue listing operation.
  2. The OnReceiveComplete event will be raised when a message arrives.
  3. The command will be executed by passing in the message content.
  
    public interface IQueueMonitor : IDisposable
    {
        void Start();
        void Stop();
    }

    public class QueueMonitor<T> : IQueueMonitor
    {
        private readonly MessageQueue _queue;
        private readonly ICommand<T> _command;
        private bool _continue = true;

        public QueueMonitor(MessageQueue queue, ICommand<T> command)
        {
            if (queue == null) throw new ArgumentNullException("queue");
            if (command == null) throw new ArgumentNullException("command");
            _queue = queue;
            _command = command;

            _queue.ReceiveCompleted += OnReceiveCompleted;
        }

        private void OnReceiveCompleted(object sender, 
                             ReceiveCompletedEventArgs receiveCompletedEventArgs)
        {
            var message = _queue.EndReceive(receiveCompletedEventArgs.AsyncResult);
            _command.Execute((T)message.Body);
            if (_continue) _queue.BeginReceive();
        }

        public void Start()
        {
            _continue = true;
            _queue.BeginReceive();
        }

        public void Stop()
        {
            _continue = false;
        }

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool isDisposing)
        {
            if (!isDisposing || _queue == null) return;
            _queue.ReceiveCompleted -= OnReceiveCompleted;
            _queue.Dispose();
        }
    }

Single Receiver

Let's see the gears ticking over by processing the messages on the queue.

  
            using (var queue = new MessageQueue(@".\Private$\Orders"))
            {
                var command = new OrderProcessorCommand(1);
                var queueMonitor = new QueueMonitor<OrderModel>(queue, command);
                queueMonitor.Start();
            }
Processed Order 1 by worker 1 from 6:20:11 to 6:20:13
Processed Order 2 by worker 1 from 6:20:13 to 6:20:15
Processed Order 3 by worker 1 from 6:20:15 to 6:20:17 

The output above shows that the single receiver processed the messages in order, one at a time.

The drawback of the single receiver is the finite amount of throughput due to the constraint of processing one message at time.

Summary

This post demonstrated a generic approach to continually monitor a queue for new messages and pushing the message content to a command to execute.

The next post will describe how to broadcast a single message across multiple processors.