Software Design Blog - Software Architecture Simple solutions to solve complex problems / BlogEngine.NET en-US /opml.axd Jay Strydom Software Design Blog 0.000000 0.000000 Message Queue Delivery Strategies <img src="/pics/banners/AdvancedMSMQBlog.jpg" class="img-responsive" alt="Message Queue Delivery Strategies"> <br> <p>The previous post focused on <a href="/post/how-to-get-started-with-microsoft-message-queuing-msmq">MSMQ fundamentals</a> using a <b>pull</b> approach to retrieve messages from a queue.</p> <p>This post is the start of a series that covers multiple strategies to <b>push</b> queued messages to clients. The intention of the push approach is to keep clients agnostic of being part of a message based architecture.</p> <p>MSMQ technology is used but it is easy enough to change the implementation to use an alternative queuing solution such as Azure Service Bus.</p> <a href="/Downloads/" role="button" class="btn btn-primary btn-sm">Download Source Code</a> <h3>Setup</h3> <p>Setting up an MSQM was covered in the <a href="/post/how-to-get-started-with-microsoft-message-queuing-msmq">MSMQ fundamentals</a> post.</p> <p>The following code was used for placing 3 unique OrderModel messages in the queue.</p> <pre class="brush: c-sharp;"> [Serializable] public class OrderModel { public int Id { get; set; } public string Name { get; set; } } using (var queue = new MessageQueue(@".\Private$\Orders")) { queue.Formatter = new BinaryMessageFormatter(); for (var orderId = 1; orderId &lt;= 3; orderId++) { var order = new OrderModel() { Id = orderId, Name = string.Format("Order {0}", orderId) }; queue.Send(order); } } </pre> <h3>Command Pattern</h3> <p><b>The problem</b> is how can a message infrastructure issue requests to objects without knowing anything about the operation being requested or the receiver of the request?</p> <p><b>The solution</b> is to use the command pattern to decouple senders from receivers. A command decouples the object that invokes the operation from the object that knows how to perform the operation.</p> <p>The generic command interface is displayed below. The OrderProcessorCommand class implements the command interface to indicate that it can accept OrderModel messages, which it will use to simulate an order being processed.</p> <pre class="brush: c-sharp;"> public interface ICommand&lt;in T&gt; { void Execute(T message); } public class OrderProcessorCommand : ICommand&lt;OrderModel&gt; { private readonly int _id; public OrderProcessorCommand(int id) { _id = id; } public void Execute(OrderModel message) { if (message == null) throw new ArgumentNullException("message"); var start = DateTime.Now; // Simulate work being performed Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine("Processed {0} by worker {1} from {2} to {3}", message.Name, _id, start.ToString("h:mm:ss"), DateTime.Now.ToString("h:mm:ss")); } } </pre> Note that a sleep was added on line 21 to simulate work being performed by the processor. <h3>Queue Monitor</h3> <p>The queue monitor class acts as orchestrator, which is responsible for listening to the queue for incoming messages and calling a command to execute each message.</p> <p>When the client calls the start method then the workflow outlined below will run consciously until the client calls the stop method:</p> <ol> <li>The BeginReceive method will kick off the queue listing operation.</li> <li>The OnReceiveComplete event will be raised when a message arrives.</li> <li>The command will be executed by passing in the message content.</li> </ol> <pre class="brush: c-sharp;"> public interface IQueueMonitor : IDisposable { void Start(); void Stop(); } public class QueueMonitor&lt;T&gt; : IQueueMonitor { private readonly MessageQueue _queue; private readonly ICommand&lt;T&gt; _command; private bool _continue = true; public QueueMonitor(MessageQueue queue, ICommand&lt;T&gt; command) { if (queue == null) throw new ArgumentNullException("queue"); if (command == null) throw new ArgumentNullException("command"); _queue = queue; _command = command; _queue.ReceiveCompleted += OnReceiveCompleted; } private void OnReceiveCompleted(object sender, ReceiveCompletedEventArgs receiveCompletedEventArgs) { var message = _queue.EndReceive(receiveCompletedEventArgs.AsyncResult); _command.Execute((T)message.Body); if (_continue) _queue.BeginReceive(); } public void Start() { _continue = true; _queue.BeginReceive(); } public void Stop() { _continue = false; } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool isDisposing) { if (!isDisposing || _queue == null) return; _queue.ReceiveCompleted -= OnReceiveCompleted; _queue.Dispose(); } } </pre> <div class="alert alert-info" role="alert"> <b>Note:</b> The push method will not overwhelm the client with requests. The queue monitor will be blocked on line 27 and will not retrieve the next message in the queue until the command has finished processing the current request. </div> <h3>Single Receiver</h3> <p>Let's see the gears ticking over by processing the messages on the queue.</p> <pre class="brush: c-sharp;"> using (var queue = new MessageQueue(@".\Private$\Orders")) { var command = new OrderProcessorCommand(1); var queueMonitor = new QueueMonitor&lt;OrderModel&gt;(queue, command); queueMonitor.Start(); } </pre> <pre>Processed Order 1 by worker 1 from 6:20:11 to 6:20:13 Processed Order 2 by worker 1 from 6:20:13 to 6:20:15 Processed Order 3 by worker 1 from 6:20:15 to 6:20:17 </pre> <p>The output above shows that the single receiver processed the messages in order, one at a time.</p> <p><b>The drawback</b> of the single receiver is the finite amount of throughput due to the constraint of processing one message at time.</p> <h3>Summary</h3> <p> This post demonstrated a generic approach to continually monitor a queue for new messages and pushing the message content to a command to execute. </p> <p>The next post will describe how to <a href="/post/broadcasting-messages-using-the-composite-pattern">broadcast a single message across multiple processors</a>.</p> /post/message-queue-delivery-strategies /post/message-queue-delivery-strategies#comment /post.aspx?id=b1aa59bc-113f-49bf-8efa-065c0e72b4b4 Mon, 21 Dec 2015 23:33:00 +1300 Design Patterns Command Pattern Software Architecture Queues Queues Design Patterns Command Pattern Jay Strydom /pingback.axd /post.aspx?id=b1aa59bc-113f-49bf-8efa-065c0e72b4b4 0 /trackback.axd?id=b1aa59bc-113f-49bf-8efa-065c0e72b4b4 /post/message-queue-delivery-strategies#comment /syndication.axd?post=b1aa59bc-113f-49bf-8efa-065c0e72b4b4 How to get started with Microsoft Message Queuing MSMQ <img src="/pics/banners/MSMQFundamentalsBlog.jpg" class="img-responsive" alt="MSQM Fundamentals"> <br> <p>The previous post described how to <a href="/post/design-a-highly-scalable-publish-subscribe-solution">design a highly scalable solution</a> using queue oriented architecture.</p> <p>This post will cover the fundamentals to get started with Microsoft Message Queuing (MSMQ). Code examples are provided to illustrate how to create a queue, write messages to a queue and read messages from a queue synchronously and asynchronously.</p> <a href="/Downloads/" role="button" class="btn btn-primary btn-sm">Download Source Code</a> <h3>Setup</h3> <p>The pre-requisite is to <a href="" target="_blank">install MSMQ</a>, which comes free with Windows.</p> <h3>Creating a queue</h3> <p>The following code was used for creating the queue named <i>orders</i>.</p> <pre class="brush: c-sharp;"> using System.Messaging; if (!MessageQueue.Exists(@".\Private$\Orders")) { MessageQueue.Create(@".\Private$\Orders"); } </pre> <div class="alert alert-info" role="alert"> <b>Note:</b> Creating queues on a server requires powerful permissions. You may want to consider creating queues via an installer as 1) installers typically run as highly privileged users and 2) application security can be improved by following the principle of least privilege. </div> <p>The <i>orders</i> queue and messages on the queue can be viewed using Computer Management as shown below.</p> <img src="/pics/blogs/ComputerManagementQueues.png" class="img-responsive" alt="MSQM Computer Management"> <h3>Writing Messages</h3> <p>The following code was used for writing the OrderModel DTO instance (message) to the queue using a BinaryMessageFormatter.</p> <pre class="brush: c-sharp;"> [Serializable] public class OrderModel { public int Id { get; set; } public string Name { get; set; } } using (var queue = new MessageQueue(@".\Private$\Orders")) { queue.Formatter = new BinaryMessageFormatter(); var order = new OrderModel() { Id = 123, Name = "Demo Order" }; queue.Send(order); } </pre> <h3>Reading Messages</h3> <h4>Blocking Synchronous Read</h4> The following code was used for reading a message from the queue. The thread will be blocked on the receive method on line 5 until a message is available. <pre class="brush: c-sharp;"> using (var queue = new MessageQueue(@".\Private$\Orders")) { queue.Formatter = new BinaryMessageFormatter(); var message = queue.Receive(); var order = (OrderModel)message.Body; } </pre> <p>A <b>read timeout duration</b> can be specified. The code below will use a 1 sec timeout limit and catch the MessageQueueException raised due to the timeout. A custom ReadTimeoutException is thrown to notify the client that a timeout has occured.</p> <pre class="brush: c-sharp;"> public class ReadTimeoutException : Exception { public ReadTimeoutException(string message, Exception innerException) : base(message, innerException) { } } using (var queue = new MessageQueue(@".\Private$\Orders")) { queue.Formatter = new BinaryMessageFormatter(); Message message = null; try { message = queue.Receive(TimeSpan.FromSeconds(1)); } catch (MessageQueueException mqException) { if (mqException.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout) { throw new ReadTimeoutException("Reading from the queue timed out.", mqException); } throw; } } </pre> <div class="alert alert-info" role="alert"> <b>Note:</b> Throwing a custom exception such as ReadTimeoutException is considered good practice because we can swap the queueing implementation with different technology such as Azure Service Bus without modifying the client code. </div> <h4>Asynchronous Read</h4> The following code was used to perform an async read in conjunction with a Task. <pre class="brush: c-sharp;"> private async Task&lt;OrderModel&gt; ReadAsync() { using (var queue = new MessageQueue(@".\Private$\Orders")) { queue.Formatter = new BinaryMessageFormatter(); var message = await Task.Factory.FromAsync&lt;Message&gt;( queue.BeginReceive(), queue.EndReceive); return (OrderModel)message.Body; } } </pre> <h3>Summary</h3> <p>This post covered the fundamentals of using a MSMQ to read and write messages.</p> <p>The next post describes various <a href="/post/message-queue-delivery-strategies">message delivery strategies</a>.</p> /post/how-to-get-started-with-microsoft-message-queuing-msmq /post/how-to-get-started-with-microsoft-message-queuing-msmq#comment /post.aspx?id=f415a5c3-8cfd-4e52-8883-d0c73fdf9103 Sun, 20 Dec 2015 23:34:00 +1300 Software Architecture Queues Software Design MSMQ Queues Jay Strydom /pingback.axd /post.aspx?id=f415a5c3-8cfd-4e52-8883-d0c73fdf9103 0 /trackback.axd?id=f415a5c3-8cfd-4e52-8883-d0c73fdf9103 /post/how-to-get-started-with-microsoft-message-queuing-msmq#comment /syndication.axd?post=f415a5c3-8cfd-4e52-8883-d0c73fdf9103 Design a highly scalable publish-subscribe solution <img src="/pics/banners/DesignHighlyScalableSolutionBlog.jpg" class="img-responsive" alt="Design Scalable Solutions"> <br> <p>Message oriented architecture is a great option to produce highly scalable, extendible and loosely coupled solutions. The message architecture describes the principal of using a software system that can send and receive (usually asynchronously) messages to one or more queues, so that services can interact without needing to know specific details about each other.</p> <p><b>The problem</b> is how can an application in an integrated architecture communicate with other applications that are interested in receiving messages without knowing the identities of the receivers?</p> <p><b>The solution</b> is to extend the communication infrastructure by creating topics or by dynamically inspecting message content. This allows applications to subscribe to specific messages.</p> <p>This post will provide an illustration to turn a monolithic application into a distributed, highly scalable solution.</p> <h3>The Monolith</h3> <p>The conceptual model of a monolithic application is displayed below.</p> <img src="/pics/blogs/BlogQueueDesign_MonolithicApp.png" class="img-responsive" alt="Monolithic Application"> <br> <p>The application above consists of a collection of tightly coupled components. The side-effects are:</p> <ul> <li><b>Cumbersome release management</b> - the entire application must be upgraded in order to release a new feature or fix a bug in any of the components.</li> <li><b>Low performance monitoring</b> - isolating and measuring component throughput is difficult when all of the components are competing for the same resources.</li> <li><b>Poor scalability</b> - dedicated resources cannot be allocated to individual components.</li> <li><b>High security risk</b> - an attacker can gain access to all of the external dependencies once the application has been compromised.</li> </ul> <h3>Publish-Subscribe Solution</h3> <p>The conceptual model of a loosely coupled, Public-Subscribe (Pub-Sub) solution is displayed below.</p> <img src="/pics/blogs/BlogQueueDesign_PubSub.png" class="img-responsive" alt="Monolithic Application"> <br> The solution above consists of a queue based messaging system to decouple the components. The advantages are: <ul> <li><b>Resource Isolation</b> - each component can be hosted on a dedicated or shared environment.</li> <li><b>Decoupling</b> - the online store's only responsibility is to write a message to a queue when an event occurs and therefore doesn't need to know who the subscribers are.</li> <li><b>Extendible</b> - subscribers can be added or removed.</li> <li><b>Robust deployment</b> - a single component can be upgraded without impacting others.</li> <li><b>Secure</b> - an attacker needs to compromise all of the isolated components to gain access to the entire ecosystem.</li> <li><b>Testable</b> - it is a lot easier to test component behaviour in isolation when the input and execution paths are limited.</li> </ul> <h4>Recovering from failure</h4> <p>The image below depicts the mail server being unavailable temporarily.</p> <img src="/pics/blogs/BlogQueueDesign_MonitorFailure.png" class="img-responsive" alt="Queued Messages during temporary outages"> <br> <p><b>Recovering from temporary outages</b> is easy since messages will continue to queue up.</p> <p><b>Messages will not be lost</b> during an outage. Dequeuing will occur once the service becomes available again.</p> <p><b>Monitoring the queue length</b> provides insight into the overall health of the system. The diagnosis of a high queue length is typically:</p> <ul> <li>An indication that the subscriber service is down.</li> <li>The subscriber is over-utilized and does not have the capacity to service the incoming messages fast enough.</li> </ul> <p><b>Peak Load</b> can be handled gracefully without stressing out resources since the email notification component will continually dequeue and process messages, one after the other, regardless of the number of messages in the queue. The email notification component will eventually catch up during periods of lower than normal use.</p> <h4>Load Distribution and Redundancy</h4> <p>The image below depicts distributing the mail server role across multiple workers.</p> <img src="/pics/blogs/BlogQueueDesign_LoadBalancing.png " class="img-responsive" alt="Load Distribution and Redundancy"> <br> <p><b>Workload distribution</b> can be achieved by deploying additional subscribers to dequeue and process messages from the same queue.</p> <p><b>High availability, scalability and redundancy</b> capabilities are provided when multiple subscribers are deployed to service the same queue. The system will remain online during maintenance by upgrading one subscriber at a time.</p> <p><b>Performance benchmarking</b> can be achieved by monitoring the queue length. Throughput can be predicated using the follow approach: </p><ol> <li>Stopping the component server</li> <li>Filling the queue up with a large quantity of messages</li> <li>Starting the component server</li> <li>Measuring the number of messages that are processed per minute by monitoring the queue length</li> </ol> <p><b>Lower latency</b> can be achieved with geo-distributed deployments. For example, the Loyalty system, which is responsible for communicating with an external API, can be hosted in the same region as the API.</p> <h3>Summary</h3> This post provided a laundry list of benefits for designing applications using a message based architectural pattern compared to traditional highly coupled monolithic applications. The Pub-Sub architecture is a proven and reliable approach to produce highly scalable solutions. /post/design-a-highly-scalable-publish-subscribe-solution /post/design-a-highly-scalable-publish-subscribe-solution#comment /post.aspx?id=5cf20f8b-0346-4192-a55f-0ad5b8e72344 Sat, 19 Dec 2015 00:49:00 +1300 Software Architecture Queues MSMQ Queues Messaging Jay Strydom /pingback.axd /post.aspx?id=5cf20f8b-0346-4192-a55f-0ad5b8e72344 0 /trackback.axd?id=5cf20f8b-0346-4192-a55f-0ad5b8e72344 /post/design-a-highly-scalable-publish-subscribe-solution#comment /syndication.axd?post=5cf20f8b-0346-4192-a55f-0ad5b8e72344