Software Design Blog

Simple solutions to solve complex problems

Message Queue Delivery Strategies

Message Queue Delivery Strategies

The previous post focused on MSMQ fundamentals using a pull approach to retrieve messages from a queue.

This post is the start of a series that covers multiple strategies to push queued messages to clients. The intention of the push approach is to keep clients agnostic of being part of a message based architecture.

MSMQ technology is used but it is easy enough to change the implementation to use an alternative queuing solution such as Azure Service Bus.

Download Source Code

Setup

Setting up an MSQM was covered in the MSMQ fundamentals post.

The following code was used for placing 3 unique OrderModel messages in the queue.

  
    [Serializable]
    public class OrderModel
    {
        public int Id { get; set; }
        public string Name { get; set; }
    }

    using (var queue = new MessageQueue(@".\Private$\Orders"))
    {
        queue.Formatter = new BinaryMessageFormatter();

        for (var orderId = 1; orderId <= 3; orderId++)
        {
            var order = new OrderModel()
            {
                Id = orderId,
                Name = string.Format("Order {0}", orderId)
            };
            queue.Send(order);
        }
    }

Command Pattern

The problem is how can a message infrastructure issue requests to objects without knowing anything about the operation being requested or the receiver of the request?

The solution is to use the command pattern to decouple senders from receivers. A command decouples the object that invokes the operation from the object that knows how to perform the operation.

The generic command interface is displayed below. The OrderProcessorCommand class implements the command interface to indicate that it can accept OrderModel messages, which it will use to simulate an order being processed.

  
    public interface ICommand<in T>
    {
        void Execute(T message);
    }

    public class OrderProcessorCommand : ICommand<OrderModel>
    {
        private readonly int _id;

        public OrderProcessorCommand(int id)
        {
            _id = id;
        }

        public void Execute(OrderModel message)
        {
            if (message == null) throw new ArgumentNullException("message");
            var start = DateTime.Now;

            // Simulate work being performed
            Thread.Sleep(TimeSpan.FromSeconds(2));

            Console.WriteLine("Processed {0} by worker {1} from {2} to {3}", 
                 message.Name, _id, start.ToString("h:mm:ss"), 
                 DateTime.Now.ToString("h:mm:ss"));
        }
    }
Note that a sleep was added on line 21 to simulate work being performed by the processor.

Queue Monitor

The queue monitor class acts as orchestrator, which is responsible for listening to the queue for incoming messages and calling a command to execute each message.

When the client calls the start method then the workflow outlined below will run consciously until the client calls the stop method:

  1. The BeginReceive method will kick off the queue listing operation.
  2. The OnReceiveComplete event will be raised when a message arrives.
  3. The command will be executed by passing in the message content.
  
    public interface IQueueMonitor : IDisposable
    {
        void Start();
        void Stop();
    }

    public class QueueMonitor<T> : IQueueMonitor
    {
        private readonly MessageQueue _queue;
        private readonly ICommand<T> _command;
        private bool _continue = true;

        public QueueMonitor(MessageQueue queue, ICommand<T> command)
        {
            if (queue == null) throw new ArgumentNullException("queue");
            if (command == null) throw new ArgumentNullException("command");
            _queue = queue;
            _command = command;

            _queue.ReceiveCompleted += OnReceiveCompleted;
        }

        private void OnReceiveCompleted(object sender, 
                             ReceiveCompletedEventArgs receiveCompletedEventArgs)
        {
            var message = _queue.EndReceive(receiveCompletedEventArgs.AsyncResult);
            _command.Execute((T)message.Body);
            if (_continue) _queue.BeginReceive();
        }

        public void Start()
        {
            _continue = true;
            _queue.BeginReceive();
        }

        public void Stop()
        {
            _continue = false;
        }

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool isDisposing)
        {
            if (!isDisposing || _queue == null) return;
            _queue.ReceiveCompleted -= OnReceiveCompleted;
            _queue.Dispose();
        }
    }

Single Receiver

Let's see the gears ticking over by processing the messages on the queue.

  
            using (var queue = new MessageQueue(@".\Private$\Orders"))
            {
                var command = new OrderProcessorCommand(1);
                var queueMonitor = new QueueMonitor<OrderModel>(queue, command);
                queueMonitor.Start();
            }
Processed Order 1 by worker 1 from 6:20:11 to 6:20:13
Processed Order 2 by worker 1 from 6:20:13 to 6:20:15
Processed Order 3 by worker 1 from 6:20:15 to 6:20:17 

The output above shows that the single receiver processed the messages in order, one at a time.

The drawback of the single receiver is the finite amount of throughput due to the constraint of processing one message at time.

Summary

This post demonstrated a generic approach to continually monitor a queue for new messages and pushing the message content to a command to execute.

The next post will describe how to broadcast a single message across multiple processors.

How to get started with Microsoft Message Queuing MSMQ

MSQM Fundamentals

The previous post described how to design a highly scalable solution using queue oriented architecture.

This post will cover the fundamentals to get started with Microsoft Message Queuing (MSMQ). Code examples are provided to illustrate how to create a queue, write messages to a queue and read messages from a queue synchronously and asynchronously.

Download Source Code

Setup

The pre-requisite is to install MSMQ, which comes free with Windows.

Creating a queue

The following code was used for creating the queue named orders.

  
using System.Messaging;

if (!MessageQueue.Exists(@".\Private$\Orders"))
{
    MessageQueue.Create(@".\Private$\Orders");
} 

The orders queue and messages on the queue can be viewed using Computer Management as shown below.

MSQM Computer Management

Writing Messages

The following code was used for writing the OrderModel DTO instance (message) to the queue using a BinaryMessageFormatter.

  
    [Serializable]
    public class OrderModel
    {
        public int Id { get; set; }
        public string Name { get; set; }
    }

    using (var queue = new MessageQueue(@".\Private$\Orders"))
    {
        queue.Formatter = new BinaryMessageFormatter();

        var order = new OrderModel()
        {
            Id = 123,
            Name = "Demo Order"
        };
        queue.Send(order);
    }

Reading Messages

Blocking Synchronous Read

The following code was used for reading a message from the queue. The thread will be blocked on the receive method on line 5 until a message is available.
  
    using (var queue = new MessageQueue(@".\Private$\Orders"))
    {
        queue.Formatter = new BinaryMessageFormatter();

        var message = queue.Receive();
        var order = (OrderModel)message.Body;
    }

A read timeout duration can be specified. The code below will use a 1 sec timeout limit and catch the MessageQueueException raised due to the timeout. A custom ReadTimeoutException is thrown to notify the client that a timeout has occured.

  
    public class ReadTimeoutException : Exception
    {
        public ReadTimeoutException(string message, Exception innerException) 
               : base(message, innerException)
        {
            
        }
    }

    using (var queue = new MessageQueue(@".\Private$\Orders"))
    {
        queue.Formatter = new BinaryMessageFormatter();
        Message message = null;

        try
        {
            message = queue.Receive(TimeSpan.FromSeconds(1));
        }
        catch (MessageQueueException mqException)
        {
           if (mqException.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
           {
               throw new ReadTimeoutException("Reading from the queue timed out.", 
                            mqException);
           }
           throw;
        }
    }

Asynchronous Read

The following code was used to perform an async read in conjunction with a Task.
  
private async Task<OrderModel> ReadAsync()
{
    using (var queue = new MessageQueue(@".\Private$\Orders"))
    {
        queue.Formatter = new BinaryMessageFormatter();

        var message = await Task.Factory.FromAsync<Message>(
                              queue.BeginReceive(), queue.EndReceive);
        return (OrderModel)message.Body;
    }
}

Summary

This post covered the fundamentals of using a MSMQ to read and write messages.

The next post describes various message delivery strategies.

Design a highly scalable publish-subscribe solution

Design Scalable Solutions

Message oriented architecture is a great option to produce highly scalable, extendible and loosely coupled solutions. The message architecture describes the principal of using a software system that can send and receive (usually asynchronously) messages to one or more queues, so that services can interact without needing to know specific details about each other.

The problem is how can an application in an integrated architecture communicate with other applications that are interested in receiving messages without knowing the identities of the receivers?

The solution is to extend the communication infrastructure by creating topics or by dynamically inspecting message content. This allows applications to subscribe to specific messages.

This post will provide an illustration to turn a monolithic application into a distributed, highly scalable solution.

The Monolith

The conceptual model of a monolithic application is displayed below.

Monolithic Application

The application above consists of a collection of tightly coupled components. The side-effects are:

  • Cumbersome release management - the entire application must be upgraded in order to release a new feature or fix a bug in any of the components.
  • Low performance monitoring - isolating and measuring component throughput is difficult when all of the components are competing for the same resources.
  • Poor scalability - dedicated resources cannot be allocated to individual components.
  • High security risk - an attacker can gain access to all of the external dependencies once the application has been compromised.

Publish-Subscribe Solution

The conceptual model of a loosely coupled, Public-Subscribe (Pub-Sub) solution is displayed below.

Monolithic Application
The solution above consists of a queue based messaging system to decouple the components. The advantages are:
  • Resource Isolation - each component can be hosted on a dedicated or shared environment.
  • Decoupling - the online store's only responsibility is to write a message to a queue when an event occurs and therefore doesn't need to know who the subscribers are.
  • Extendible - subscribers can be added or removed.
  • Robust deployment - a single component can be upgraded without impacting others.
  • Secure - an attacker needs to compromise all of the isolated components to gain access to the entire ecosystem.
  • Testable - it is a lot easier to test component behaviour in isolation when the input and execution paths are limited.

Recovering from failure

The image below depicts the mail server being unavailable temporarily.

Queued Messages during temporary outages

Recovering from temporary outages is easy since messages will continue to queue up.

Messages will not be lost during an outage. Dequeuing will occur once the service becomes available again.

Monitoring the queue length provides insight into the overall health of the system. The diagnosis of a high queue length is typically:

  • An indication that the subscriber service is down.
  • The subscriber is over-utilized and does not have the capacity to service the incoming messages fast enough.

Peak Load can be handled gracefully without stressing out resources since the email notification component will continually dequeue and process messages, one after the other, regardless of the number of messages in the queue. The email notification component will eventually catch up during periods of lower than normal use.

Load Distribution and Redundancy

The image below depicts distributing the mail server role across multiple workers.

Load Distribution and Redundancy

Workload distribution can be achieved by deploying additional subscribers to dequeue and process messages from the same queue.

High availability, scalability and redundancy capabilities are provided when multiple subscribers are deployed to service the same queue. The system will remain online during maintenance by upgrading one subscriber at a time.

Performance benchmarking can be achieved by monitoring the queue length. Throughput can be predicated using the follow approach:

  1. Stopping the component server
  2. Filling the queue up with a large quantity of messages
  3. Starting the component server
  4. Measuring the number of messages that are processed per minute by monitoring the queue length

Lower latency can be achieved with geo-distributed deployments. For example, the Loyalty system, which is responsible for communicating with an external API, can be hosted in the same region as the API.

Summary

This post provided a laundry list of benefits for designing applications using a message based architectural pattern compared to traditional highly coupled monolithic applications. The Pub-Sub architecture is a proven and reliable approach to produce highly scalable solutions.