Software Design Blog

Simple solutions to solve complex problems

Push queue processing boundaries with 3x greater throughput

Broadcasting Messages

The problem is how can we measure the queue processing throughput? Or in a production environment, how can we add a performance counter without modifying our existing infrastructure?

The solution is to add a decorator pattern that will act as a proxy between the QueueMonitor class and the actual command processing class to intercept the communication and add instrumentation.

The previous post used the competing consumer pattern to process a queue concurrently. The message queue strategies post used a single processor to process a queue.

In this post, we will add instrumentation to the single queue processor and the competing consumer queue processor to compare performance.

Download Source Code

Setup

The experiment will add 512KB messages to a queue to simulate a decent workload. The intention is to read the messages from the queue and call a File SMTP mailer server that will write the emails to disk as fast as possible.

The core components such as the Queue Monitor were covered in the Message Queue Delivery Strategies post. The queue monitor is responsible for reading messages from the queue and pushing the message content to a command to execute.

The classes that will be used in this experiment are listed below.

  
    public class FileSmtpMailer : IMailer
    {
        private readonly string _path;

        public FileSmtpMailer(string path)
        {
            if (path == null) throw new ArgumentNullException("path");
            _path = path;
        }

        public void Send(string message, string sender, string recipients, string subject)
        {
            using (var client = new SmtpClient())
            {
                // This can be configured in the app.config
                client.DeliveryMethod = SmtpDeliveryMethod.SpecifiedPickupDirectory;
                client.PickupDirectoryLocation = _path;

                using (var mailMessage = new MailMessage(sender, recipients, 
                                                         subject, message))
                {
                    mailMessage.IsBodyHtml = true;
                    client.Send(mailMessage);
                }
            }
        }
    }

    public class MailProcessorCommand : ICommand<OrderMessage>
    {
        private readonly IMailer _mailer;

        public MailProcessorCommand(IMailer mailer)
        {
            if (mailer == null) throw new ArgumentNullException("mailer");
            _mailer = mailer;
        }

        public void Execute(OrderMessage message)
        {
            if (message == null) throw new ArgumentNullException("message");
            _mailer.Send(message.Body, message.Sender, 
                         message.Recipients, message.Subject);
        }
    }

Instrumentation

The TimerCommandDecorator decorator will be placed in between the QueueMonitor and the MailProcessorComamnd in order to measure the performance throughput.

  
    public class TimerCommandDecorator<T> : ICommand<T>
    {
        private readonly ICommand<T> _next;
        private int _messages;
        private Stopwatch _stopwatch;

        public TimerCommandDecorator(ICommand<T> next)
        {
            if (next == null) throw new ArgumentNullException("next");
            _next = next;
        }

        public void Execute(T message)
        {
            if (_stopwatch == null) _stopwatch = Stopwatch.StartNew();

            _next.Execute(message);

            _messages += 1;
            Console.WriteLine("Processing {0} messages took {1} sec", 
                              _messages, _stopwatch.Elapsed.TotalSeconds);
        }
    }

The TimerCommandDecorator implementation can easily be swapped out with a perfmon version that will increment the performance counter for each message.

Single vs Competing Consumers

Let's run the experiment by observing the throughput difference between the single processor and competing consumer receivers by processing 1000 messages.

The single processor version:

  
            using (var queue = new MessageQueue(QueuePath))
            {
                queue.Formatter = new BinaryMessageFormatter();

                var message = new OrderMessage()
                {
                    Recipients = "[email protected]",
                    Sender = "[email protected]",
                    Subject = "Hello World",
                    Body = emailBody // loaded from a file
                };

                // Send messages to the queue
                for (var i = 0; i < 1000; i++)
                {
                    queue.Send(message);
                }               

                var fileMailer = new FileSmtpMailer(@"C:\temp");
                var command = new MailProcessorCommand(fileMailer);
                var timerCommand = new TimerCommandDecorator<OrderMessage>(command);
                var queueMonitor = new QueueMonitor<OrderMessage>(queue, timerCommand);
                queueMonitor.Start();
            }
Processing 1000 messages took 47.3004132 sec

The competing consumer processor version with 10 processors:

  
                // Create 10 processors
                var commands = new List<ICommand<OrderMessage>>();
                for (var i = 0; i < 10; i++)
                {
                    commands.Add(new MailProcessorCommand(fileMailer));
                }

                var command = new CompositeCompetingCommandProcessor<OrderMessage>(commands);
                var timerCommand = new TimerCommandDecorator<OrderMessage>(command);
                var queueMonitor = new QueueMonitor<OrderMessage>(queue, timerCommand);
                queueMonitor.Start();
Processing 1000 messages took 16.241723 sec

We could experiment using various numbers of concurrent receivers to determine how performance will be affected.

The competing consumer pattern is a great option to fully utilize resources but it may have a negative impact on other tenants of the system. For example, if we call an external API with too many requests, it may appear like a denial-of-service attack.

Summary

The advantage of the competing consumers pattern is the flexibility to configure the number of concurrent consumers to produce a balanced outcome. It is however difficult to determine the throughput and performance impact without instrumentation in place to measure resource utilization.

An approach was covered to add performance instrumentation. The performance metrics revealed that the queue processing throughput can be improved significantly using the competing consumer queue processor compared to the single queue processor.

Dispatching messages with the Competing Consumers Pattern

Broadcasting Messages

The previous post focused on fanning out the same queued message to multiple consumers.

The problem is that a single worker is great at processing a single message at a time but how do we increase the throughput by processing messages faster?

The solution is to use the Competing Consumers Pattern, which enables multiple concurrent workers to process messages received in the same queue. This approach will optimise throughput and balance the workload.

For example, travellers often queue at the airport check-in. Checking in would be a lengthy process if there was a long queue but only one check-in operator. The solution is to employee multiple operators to service the same queue to process it faster.

Download Source Code

Setup

The core components such as the Queue Monitor were covered in the Message Queue Delivery Strategies post. The queue monitor is responsible for reading messages from the queue and pushing the message content to a command to execute.

The OrderProcessorCommand class used in the previous post was modified on line 19 below to simulate each worker taking a different duration to process a request. For example, worker 1 will take 1 second to process a request whereas worker 2 will take 2 seconds to process a request.

  
    public interface ICommand<in T>
    {
        void Execute(T message);
    }

    public class OrderProcessorCommand : ICommand<OrderModel>
    {
        private readonly int _id;

        public OrderProcessorCommand(int id)
        {
            _id = id;
        }

        public void Execute(OrderModel message)
        {
            if (message == null) throw new ArgumentNullException("message");
            var start = DateTime.Now;
            Thread.Sleep(TimeSpan.FromSeconds(_id));
            Console.WriteLine("Processed {0} by worker {1} from {2} to {3}", 
                               message.Name, _id, start.ToString("h:mm:ss"),
                               DateTime.Now.ToString("h:mm:ss"));
        }
    }

Competing Consumers

The pool of available commands are orchestrated using the CompositeCompetingCommandProcessor class.

The class will automatically block the queue monitor on line 33 from retrieving and pushing in the next message when all of the command processors are busy. The queue monitor will be unblocked as soon as the first command processor becomes available.

The WorkingCommandModel is used to associate a command with a task.

  
    public class WorkingCommandModel<T>
    {
        public ICommand<T> Command { get; set; }
        public Task Task { get; set; }
    }

    public class CompositeCompetingCommandProcessor<T> : ICommand<T>
    {
        private readonly IEnumerable<WorkingCommandModel<T>> _workers;

        public CompositeCompetingCommandProcessor(IEnumerable<ICommand<T>> commands)
        {
            if (commands == null) throw new ArgumentNullException("commands");
            _workers = commands.Select(
                          c => new WorkingCommandModel<T> { Command = c }).ToList();
        }

        public void Execute(T message)
        {
            WorkingCommandModel<T> worker = null;

            while (worker == null)
            {
                worker = GetAvailableWorker();
                if (worker == null) WaitForWorkerAvailability();
            }

            worker.Task = Task.Factory.StartNew(() => 
                                         worker.Command.Execute(message), 
                                         TaskCreationOptions.LongRunning);

            // Block new messages from arriving until a worker is available
            if (GetAvailableWorker() == null) WaitForWorkerAvailability();
        }

        private void WaitForWorkerAvailability()
        {
            var tasks = (from w in _workers where w.Task != null select w.Task);
            Task.WaitAny(tasks.ToArray());
        }

        private WorkingCommandModel<T> GetAvailableWorker()
        {
            var worker = _workers.FirstOrDefault(w => 
                                       w.Task == null || w.Task.IsCompleted);
            if (worker != null && worker.Task != null) worker.Task.Dispose();
            return worker;
        }
    }
Let's run the solution using 3 workers to process 9 unique messages in the queue.
  
            using (var queue = new MessageQueue(QueuePath))
            {
                queue.Formatter = new BinaryMessageFormatter();

                // Write 9 messages to the queue
                for (var orderId = 1; orderId <= 9; orderId++)
                {
                    var order = new OrderModel()
                    {
                        Id = orderId,
                        Name = string.Format("Order {0}", orderId)
                    };
                    queue.Send(order);
                }
                
                // Create 3 workers
                var workers = new List<ICommand<OrderModel>>();
                for (var workerId = 1; workerId <= 3; workerId++)
                {
                    workers.Add(new OrderProcessorCommand(workerId));
                }

                // Process the queue
                var command = new CompositeCompetingCommandProcessor<OrderModel>(workers);
                var queueMonitor = new QueueMonitor<OrderModel>(queue, command);
                queueMonitor.Start();
            }
Processed Order 1 by worker 1 from 6:42:48 to 6:42:49
Processed Order 2 by worker 2 from 6:42:48 to 6:42:50
Processed Order 4 by worker 1 from 6:42:49 to 6:42:50
Processed Order 3 by worker 3 from 6:42:48 to 6:42:51
Processed Order 6 by worker 1 from 6:42:50 to 6:42:51
Processed Order 5 by worker 2 from 6:42:50 to 6:42:52
Processed Order 8 by worker 1 from 6:42:51 to 6:42:52
Processed Order 7 by worker 3 from 6:42:51 to 6:42:54
Processed Order 9 by worker 2 from 6:42:52 to 6:42:54

As we can see from the results above, worker 1 processed 4 messages whereas worker 3 only processed 2 messages.

The order can sometimes be important. For example, a reservation system may processes new bookings, updates and cancellations. Let's say the traveller creates a booking and decides to cancel it immediately. When both requests are in a queue processed by multiple workers then there is a potential that the cancellation will be processed before the create.

Summary

A practical example was provided to process messages from a single queue using multiple workers by applying the competing consumers pattern.

The drawback of potentially processing messages out of order was briefly covered. With enough interest, I may create a follow up post to address the order issue using various techniques such as a sequential message convoy, applying sessions and using correlation IDs to preserve delivery order.

The next post will provide an illustration for adding instrumentation to monitor queue processing throughput. The performance of the competing consumers solution is compared with the single worker solution.

Broadcasting messages using the composite pattern

Broadcasting Messages

The previous post focused on reading messages from a queue and pushing the messages to a single command for processing.

The problem is how can we notify multiple commands to process the same request? For example, when an order is received, we may want a command to 1) send an email, 2) add an audit record and 3) call an external API.

The solution is to use the composite pattern, whereby a group of commands are treated in a similar way as a single command to fan out the messages.

Download Source Code

Setup

The core components such as the Queue Monitor were covered in the Message Queue Delivery Strategies post.

Sequential Processing

The CompositeSequentialBroadcastCommand class below implements ICommand to indicate that it exhibits the behaviour of a command. The responsibility of the composite class is to loop though a collection of commands and call each command with the same request.

  
    public interface ICommand<in T>
    {
        void Execute(T message);
    }

    public class OrderProcessorCommand : ICommand<OrderModel>
    {
        private readonly int _id;

        public OrderProcessorCommand(int id)
        {
            _id = id;
        }

        public void Execute(OrderModel message)
        {
            if (message == null) throw new ArgumentNullException("message");
            var start = DateTime.Now;
            Thread.Sleep(TimeSpan.FromSeconds(2));
            Console.WriteLine("Processed {0} by worker {1} from {2} to {3}", 
                               message.Name, _id, start.ToString("h:mm:ss"), 
                               DateTime.Now.ToString("h:mm:ss"));
        }
    }

    public class CompositeSequentialBroadcastCommand<T> : ICommand<T>
    {
        private readonly IEnumerable<ICommand<T>> _commands;

        public CompositeSequentialBroadcastCommand(IEnumerable<ICommand<T>> commands)
        {
            if (commands == null) throw new ArgumentNullException("commands");
            _commands = commands.ToList();
        }

        public void Execute(T message)
        {
            foreach (var command in _commands)
            {
                command.Execute(message);
            }
        }
    }
Let's see what happens when 3 sequential workers are configured to process 2 messages.
  
            using (var queue = new MessageQueue(@".\Private$\Orders"))
            {
                queue.Formatter = new BinaryMessageFormatter();

                var workers = new List<ICommand<OrderModel>>();
                for (var workerId = 1; workerId <= 3; workerId++)
                {
                    workers.Add(new OrderProcessorCommand(workerId));
                }
                
                var command = new CompositeSequentialBroadcastCommand<OrderModel>(workers);
                var queueMonitor = new QueueMonitor<OrderModel>(queue, command);
                queueMonitor.Start();
            }
Processed Order 1 by worker 1 from 6:26:34 to 6:26:36
Processed Order 1 by worker 2 from 6:26:36 to 6:26:38
Processed Order 1 by worker 3 from 6:26:38 to 6:26:40
Processed Order 2 by worker 1 from 6:26:40 to 6:26:42
Processed Order 2 by worker 2 from 6:26:42 to 6:26:44
Processed Order 2 by worker 3 from 6:26:44 to 6:26:46

The output above shows that each worker processed the same message in sequence. If worker 2 failed then worker 3 would not be executed. This can be a great pattern when the sequence is important. For example, only send the welcome email if the order has been created successfully.

Parallel Processing

The CompositeParallelBroadcastCommand class is similar to the previous example. The only difference is that this class will call the commands in parallel.

  
    class CompositeParallelBroadcastCommand<T> : ICommand<T>
    {
        private readonly IEnumerable<ICommand<T>> _commands;

        public CompositeParallelBroadcastCommand(IEnumerable<ICommand<T>> commands)
        {
            if (commands == null) throw new ArgumentNullException("commands");
            _commands = commands.ToList();
        }

        public void Execute(T message)
        {
            Parallel.ForEach(_commands, c => c.Execute(message));
        }
    }
Let's see what happens when parallel 3 workers are configured to process 2 messages.
  
            using (var queue = new MessageQueue(@".\Private$\Orders"))
            {
                queue.Formatter = new BinaryMessageFormatter();

                var workers = new List<ICommand<OrderModel>>();
                for (var workerId = 1; workerId <= 3; workerId++)
                {
                    workers.Add(new OrderProcessorCommand(workerId));
                }
                
                var command = new CompositeParallelBroadcastCommand<OrderModel>(workers);
                var queueMonitor = new QueueMonitor<OrderModel>(queue, command);
                queueMonitor.Start();
            }
Processed Order 1 by worker 2 from 6:08:36 to 6:08:38
Processed Order 1 by worker 1 from 6:08:36 to 6:08:38
Processed Order 1 by worker 3 from 6:08:36 to 6:08:38
Processed Order 2 by worker 3 from 6:08:38 to 6:08:40
Processed Order 2 by worker 2 from 6:08:38 to 6:08:40
Processed Order 2 by worker 1 from 6:08:38 to 6:08:40

The output above shows that each worker processed the same message concurrently. This can be a great pattern when sequencing doesn’t matters. For example, send a welcome email and add an audit record at the same time.

Summary

This post focused on broadcasting a single message to multiple receivers. The composite pattern allows the behaviour of the Queue Monitor push to change without modifying the code.

Coming soon: The next post will focus on processing a single request by dispatching messaging to a pool of competing consumer processors.

Message Queue Delivery Strategies

Message Queue Delivery Strategies

The previous post focused on MSMQ fundamentals using a pull approach to retrieve messages from a queue.

This post is the start of a series that covers multiple strategies to push queued messages to clients. The intention of the push approach is to keep clients agnostic of being part of a message based architecture.

MSMQ technology is used but it is easy enough to change the implementation to use an alternative queuing solution such as Azure Service Bus.

Download Source Code

Setup

Setting up an MSQM was covered in the MSMQ fundamentals post.

The following code was used for placing 3 unique OrderModel messages in the queue.

  
    [Serializable]
    public class OrderModel
    {
        public int Id { get; set; }
        public string Name { get; set; }
    }

    using (var queue = new MessageQueue(@".\Private$\Orders"))
    {
        queue.Formatter = new BinaryMessageFormatter();

        for (var orderId = 1; orderId <= 3; orderId++)
        {
            var order = new OrderModel()
            {
                Id = orderId,
                Name = string.Format("Order {0}", orderId)
            };
            queue.Send(order);
        }
    }

Command Pattern

The problem is how can a message infrastructure issue requests to objects without knowing anything about the operation being requested or the receiver of the request?

The solution is to use the command pattern to decouple senders from receivers. A command decouples the object that invokes the operation from the object that knows how to perform the operation.

The generic command interface is displayed below. The OrderProcessorCommand class implements the command interface to indicate that it can accept OrderModel messages, which it will use to simulate an order being processed.

  
    public interface ICommand<in T>
    {
        void Execute(T message);
    }

    public class OrderProcessorCommand : ICommand<OrderModel>
    {
        private readonly int _id;

        public OrderProcessorCommand(int id)
        {
            _id = id;
        }

        public void Execute(OrderModel message)
        {
            if (message == null) throw new ArgumentNullException("message");
            var start = DateTime.Now;

            // Simulate work being performed
            Thread.Sleep(TimeSpan.FromSeconds(2));

            Console.WriteLine("Processed {0} by worker {1} from {2} to {3}", 
                 message.Name, _id, start.ToString("h:mm:ss"), 
                 DateTime.Now.ToString("h:mm:ss"));
        }
    }
Note that a sleep was added on line 21 to simulate work being performed by the processor.

Queue Monitor

The queue monitor class acts as orchestrator, which is responsible for listening to the queue for incoming messages and calling a command to execute each message.

When the client calls the start method then the workflow outlined below will run consciously until the client calls the stop method:

  1. The BeginReceive method will kick off the queue listing operation.
  2. The OnReceiveComplete event will be raised when a message arrives.
  3. The command will be executed by passing in the message content.
  
    public interface IQueueMonitor : IDisposable
    {
        void Start();
        void Stop();
    }

    public class QueueMonitor<T> : IQueueMonitor
    {
        private readonly MessageQueue _queue;
        private readonly ICommand<T> _command;
        private bool _continue = true;

        public QueueMonitor(MessageQueue queue, ICommand<T> command)
        {
            if (queue == null) throw new ArgumentNullException("queue");
            if (command == null) throw new ArgumentNullException("command");
            _queue = queue;
            _command = command;

            _queue.ReceiveCompleted += OnReceiveCompleted;
        }

        private void OnReceiveCompleted(object sender, 
                             ReceiveCompletedEventArgs receiveCompletedEventArgs)
        {
            var message = _queue.EndReceive(receiveCompletedEventArgs.AsyncResult);
            _command.Execute((T)message.Body);
            if (_continue) _queue.BeginReceive();
        }

        public void Start()
        {
            _continue = true;
            _queue.BeginReceive();
        }

        public void Stop()
        {
            _continue = false;
        }

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool isDisposing)
        {
            if (!isDisposing || _queue == null) return;
            _queue.ReceiveCompleted -= OnReceiveCompleted;
            _queue.Dispose();
        }
    }

Single Receiver

Let's see the gears ticking over by processing the messages on the queue.

  
            using (var queue = new MessageQueue(@".\Private$\Orders"))
            {
                var command = new OrderProcessorCommand(1);
                var queueMonitor = new QueueMonitor<OrderModel>(queue, command);
                queueMonitor.Start();
            }
Processed Order 1 by worker 1 from 6:20:11 to 6:20:13
Processed Order 2 by worker 1 from 6:20:13 to 6:20:15
Processed Order 3 by worker 1 from 6:20:15 to 6:20:17 

The output above shows that the single receiver processed the messages in order, one at a time.

The drawback of the single receiver is the finite amount of throughput due to the constraint of processing one message at time.

Summary

This post demonstrated a generic approach to continually monitor a queue for new messages and pushing the message content to a command to execute.

The next post will describe how to broadcast a single message across multiple processors.

How to get started with Microsoft Message Queuing MSMQ

MSQM Fundamentals

The previous post described how to design a highly scalable solution using queue oriented architecture.

This post will cover the fundamentals to get started with Microsoft Message Queuing (MSMQ). Code examples are provided to illustrate how to create a queue, write messages to a queue and read messages from a queue synchronously and asynchronously.

Download Source Code

Setup

The pre-requisite is to install MSMQ, which comes free with Windows.

Creating a queue

The following code was used for creating the queue named orders.

  
using System.Messaging;

if (!MessageQueue.Exists(@".\Private$\Orders"))
{
    MessageQueue.Create(@".\Private$\Orders");
} 

The orders queue and messages on the queue can be viewed using Computer Management as shown below.

MSQM Computer Management

Writing Messages

The following code was used for writing the OrderModel DTO instance (message) to the queue using a BinaryMessageFormatter.

  
    [Serializable]
    public class OrderModel
    {
        public int Id { get; set; }
        public string Name { get; set; }
    }

    using (var queue = new MessageQueue(@".\Private$\Orders"))
    {
        queue.Formatter = new BinaryMessageFormatter();

        var order = new OrderModel()
        {
            Id = 123,
            Name = "Demo Order"
        };
        queue.Send(order);
    }

Reading Messages

Blocking Synchronous Read

The following code was used for reading a message from the queue. The thread will be blocked on the receive method on line 5 until a message is available.
  
    using (var queue = new MessageQueue(@".\Private$\Orders"))
    {
        queue.Formatter = new BinaryMessageFormatter();

        var message = queue.Receive();
        var order = (OrderModel)message.Body;
    }

A read timeout duration can be specified. The code below will use a 1 sec timeout limit and catch the MessageQueueException raised due to the timeout. A custom ReadTimeoutException is thrown to notify the client that a timeout has occured.

  
    public class ReadTimeoutException : Exception
    {
        public ReadTimeoutException(string message, Exception innerException) 
               : base(message, innerException)
        {
            
        }
    }

    using (var queue = new MessageQueue(@".\Private$\Orders"))
    {
        queue.Formatter = new BinaryMessageFormatter();
        Message message = null;

        try
        {
            message = queue.Receive(TimeSpan.FromSeconds(1));
        }
        catch (MessageQueueException mqException)
        {
           if (mqException.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
           {
               throw new ReadTimeoutException("Reading from the queue timed out.", 
                            mqException);
           }
           throw;
        }
    }

Asynchronous Read

The following code was used to perform an async read in conjunction with a Task.
  
private async Task<OrderModel> ReadAsync()
{
    using (var queue = new MessageQueue(@".\Private$\Orders"))
    {
        queue.Formatter = new BinaryMessageFormatter();

        var message = await Task.Factory.FromAsync<Message>(
                              queue.BeginReceive(), queue.EndReceive);
        return (OrderModel)message.Body;
    }
}

Summary

This post covered the fundamentals of using a MSMQ to read and write messages.

The next post describes various message delivery strategies.

Design a highly scalable publish-subscribe solution

Design Scalable Solutions

Message oriented architecture is a great option to produce highly scalable, extendible and loosely coupled solutions. The message architecture describes the principal of using a software system that can send and receive (usually asynchronously) messages to one or more queues, so that services can interact without needing to know specific details about each other.

The problem is how can an application in an integrated architecture communicate with other applications that are interested in receiving messages without knowing the identities of the receivers?

The solution is to extend the communication infrastructure by creating topics or by dynamically inspecting message content. This allows applications to subscribe to specific messages.

This post will provide an illustration to turn a monolithic application into a distributed, highly scalable solution.

The Monolith

The conceptual model of a monolithic application is displayed below.

Monolithic Application

The application above consists of a collection of tightly coupled components. The side-effects are:

  • Cumbersome release management - the entire application must be upgraded in order to release a new feature or fix a bug in any of the components.
  • Low performance monitoring - isolating and measuring component throughput is difficult when all of the components are competing for the same resources.
  • Poor scalability - dedicated resources cannot be allocated to individual components.
  • High security risk - an attacker can gain access to all of the external dependencies once the application has been compromised.

Publish-Subscribe Solution

The conceptual model of a loosely coupled, Public-Subscribe (Pub-Sub) solution is displayed below.

Monolithic Application
The solution above consists of a queue based messaging system to decouple the components. The advantages are:
  • Resource Isolation - each component can be hosted on a dedicated or shared environment.
  • Decoupling - the online store's only responsibility is to write a message to a queue when an event occurs and therefore doesn't need to know who the subscribers are.
  • Extendible - subscribers can be added or removed.
  • Robust deployment - a single component can be upgraded without impacting others.
  • Secure - an attacker needs to compromise all of the isolated components to gain access to the entire ecosystem.
  • Testable - it is a lot easier to test component behaviour in isolation when the input and execution paths are limited.

Recovering from failure

The image below depicts the mail server being unavailable temporarily.

Queued Messages during temporary outages

Recovering from temporary outages is easy since messages will continue to queue up.

Messages will not be lost during an outage. Dequeuing will occur once the service becomes available again.

Monitoring the queue length provides insight into the overall health of the system. The diagnosis of a high queue length is typically:

  • An indication that the subscriber service is down.
  • The subscriber is over-utilized and does not have the capacity to service the incoming messages fast enough.

Peak Load can be handled gracefully without stressing out resources since the email notification component will continually dequeue and process messages, one after the other, regardless of the number of messages in the queue. The email notification component will eventually catch up during periods of lower than normal use.

Load Distribution and Redundancy

The image below depicts distributing the mail server role across multiple workers.

Load Distribution and Redundancy

Workload distribution can be achieved by deploying additional subscribers to dequeue and process messages from the same queue.

High availability, scalability and redundancy capabilities are provided when multiple subscribers are deployed to service the same queue. The system will remain online during maintenance by upgrading one subscriber at a time.

Performance benchmarking can be achieved by monitoring the queue length. Throughput can be predicated using the follow approach:

  1. Stopping the component server
  2. Filling the queue up with a large quantity of messages
  3. Starting the component server
  4. Measuring the number of messages that are processed per minute by monitoring the queue length

Lower latency can be achieved with geo-distributed deployments. For example, the Loyalty system, which is responsible for communicating with an external API, can be hosted in the same region as the API.

Summary

This post provided a laundry list of benefits for designing applications using a message based architectural pattern compared to traditional highly coupled monolithic applications. The Pub-Sub architecture is a proven and reliable approach to produce highly scalable solutions.