Software Design Blog

Simple solutions to solve complex problems

Broadcasting messages using the composite pattern

Broadcasting Messages

The previous post focused on reading messages from a queue and pushing the messages to a single command for processing.

The problem is how can we notify multiple commands to process the same request? For example, when an order is received, we may want a command to 1) send an email, 2) add an audit record and 3) call an external API.

The solution is to use the composite pattern, whereby a group of commands are treated in a similar way as a single command to fan out the messages.

Download Source Code

Setup

The core components such as the Queue Monitor were covered in the Message Queue Delivery Strategies post.

Sequential Processing

The CompositeSequentialBroadcastCommand class below implements ICommand to indicate that it exhibits the behaviour of a command. The responsibility of the composite class is to loop though a collection of commands and call each command with the same request.

  
    public interface ICommand<in T>
    {
        void Execute(T message);
    }

    public class OrderProcessorCommand : ICommand<OrderModel>
    {
        private readonly int _id;

        public OrderProcessorCommand(int id)
        {
            _id = id;
        }

        public void Execute(OrderModel message)
        {
            if (message == null) throw new ArgumentNullException("message");
            var start = DateTime.Now;
            Thread.Sleep(TimeSpan.FromSeconds(2));
            Console.WriteLine("Processed {0} by worker {1} from {2} to {3}", 
                               message.Name, _id, start.ToString("h:mm:ss"), 
                               DateTime.Now.ToString("h:mm:ss"));
        }
    }

    public class CompositeSequentialBroadcastCommand<T> : ICommand<T>
    {
        private readonly IEnumerable<ICommand<T>> _commands;

        public CompositeSequentialBroadcastCommand(IEnumerable<ICommand<T>> commands)
        {
            if (commands == null) throw new ArgumentNullException("commands");
            _commands = commands.ToList();
        }

        public void Execute(T message)
        {
            foreach (var command in _commands)
            {
                command.Execute(message);
            }
        }
    }
Let's see what happens when 3 sequential workers are configured to process 2 messages.
  
            using (var queue = new MessageQueue(@".\Private$\Orders"))
            {
                queue.Formatter = new BinaryMessageFormatter();

                var workers = new List<ICommand<OrderModel>>();
                for (var workerId = 1; workerId <= 3; workerId++)
                {
                    workers.Add(new OrderProcessorCommand(workerId));
                }
                
                var command = new CompositeSequentialBroadcastCommand<OrderModel>(workers);
                var queueMonitor = new QueueMonitor<OrderModel>(queue, command);
                queueMonitor.Start();
            }
Processed Order 1 by worker 1 from 6:26:34 to 6:26:36
Processed Order 1 by worker 2 from 6:26:36 to 6:26:38
Processed Order 1 by worker 3 from 6:26:38 to 6:26:40
Processed Order 2 by worker 1 from 6:26:40 to 6:26:42
Processed Order 2 by worker 2 from 6:26:42 to 6:26:44
Processed Order 2 by worker 3 from 6:26:44 to 6:26:46

The output above shows that each worker processed the same message in sequence. If worker 2 failed then worker 3 would not be executed. This can be a great pattern when the sequence is important. For example, only send the welcome email if the order has been created successfully.

Parallel Processing

The CompositeParallelBroadcastCommand class is similar to the previous example. The only difference is that this class will call the commands in parallel.

  
    class CompositeParallelBroadcastCommand<T> : ICommand<T>
    {
        private readonly IEnumerable<ICommand<T>> _commands;

        public CompositeParallelBroadcastCommand(IEnumerable<ICommand<T>> commands)
        {
            if (commands == null) throw new ArgumentNullException("commands");
            _commands = commands.ToList();
        }

        public void Execute(T message)
        {
            Parallel.ForEach(_commands, c => c.Execute(message));
        }
    }
Let's see what happens when parallel 3 workers are configured to process 2 messages.
  
            using (var queue = new MessageQueue(@".\Private$\Orders"))
            {
                queue.Formatter = new BinaryMessageFormatter();

                var workers = new List<ICommand<OrderModel>>();
                for (var workerId = 1; workerId <= 3; workerId++)
                {
                    workers.Add(new OrderProcessorCommand(workerId));
                }
                
                var command = new CompositeParallelBroadcastCommand<OrderModel>(workers);
                var queueMonitor = new QueueMonitor<OrderModel>(queue, command);
                queueMonitor.Start();
            }
Processed Order 1 by worker 2 from 6:08:36 to 6:08:38
Processed Order 1 by worker 1 from 6:08:36 to 6:08:38
Processed Order 1 by worker 3 from 6:08:36 to 6:08:38
Processed Order 2 by worker 3 from 6:08:38 to 6:08:40
Processed Order 2 by worker 2 from 6:08:38 to 6:08:40
Processed Order 2 by worker 1 from 6:08:38 to 6:08:40

The output above shows that each worker processed the same message concurrently. This can be a great pattern when sequencing doesn’t matters. For example, send a welcome email and add an audit record at the same time.

Summary

This post focused on broadcasting a single message to multiple receivers. The composite pattern allows the behaviour of the Queue Monitor push to change without modifying the code.

Coming soon: The next post will focus on processing a single request by dispatching messaging to a pool of competing consumer processors.