The problem is how can we measure the queue processing throughput? Or in a production environment, how can we add a performance counter without modifying our existing infrastructure?
The solution is to add a decorator pattern that will act as a proxy between the QueueMonitor class and the actual command processing class to intercept the communication and add instrumentation.
The previous post used the competing consumer pattern to process a queue concurrently. The message queue strategies post used a single processor to process a queue.
In this post, we will add instrumentation to the single queue processor and the competing consumer queue processor to compare performance.
Download Source Code
Setup
The experiment will add 512KB messages to a queue to simulate a decent workload. The intention is to read the messages from the queue and call a File SMTP mailer server that will write the emails to disk as fast as possible.
The core components such as the Queue Monitor were covered in the Message Queue Delivery Strategies post. The queue monitor is responsible for reading messages from the queue and pushing the message content to a command to execute.
The classes that will be used in this experiment are listed below.
public class FileSmtpMailer : IMailer
{
private readonly string _path;
public FileSmtpMailer(string path)
{
if (path == null) throw new ArgumentNullException("path");
_path = path;
}
public void Send(string message, string sender, string recipients, string subject)
{
using (var client = new SmtpClient())
{
// This can be configured in the app.config
client.DeliveryMethod = SmtpDeliveryMethod.SpecifiedPickupDirectory;
client.PickupDirectoryLocation = _path;
using (var mailMessage = new MailMessage(sender, recipients,
subject, message))
{
mailMessage.IsBodyHtml = true;
client.Send(mailMessage);
}
}
}
}
public class MailProcessorCommand : ICommand<OrderMessage>
{
private readonly IMailer _mailer;
public MailProcessorCommand(IMailer mailer)
{
if (mailer == null) throw new ArgumentNullException("mailer");
_mailer = mailer;
}
public void Execute(OrderMessage message)
{
if (message == null) throw new ArgumentNullException("message");
_mailer.Send(message.Body, message.Sender,
message.Recipients, message.Subject);
}
}
Instrumentation
The TimerCommandDecorator decorator will be placed in between the QueueMonitor and the MailProcessorComamnd in order to measure the performance throughput.
public class TimerCommandDecorator<T> : ICommand<T>
{
private readonly ICommand<T> _next;
private int _messages;
private Stopwatch _stopwatch;
public TimerCommandDecorator(ICommand<T> next)
{
if (next == null) throw new ArgumentNullException("next");
_next = next;
}
public void Execute(T message)
{
if (_stopwatch == null) _stopwatch = Stopwatch.StartNew();
_next.Execute(message);
_messages += 1;
Console.WriteLine("Processing {0} messages took {1} sec",
_messages, _stopwatch.Elapsed.TotalSeconds);
}
}
The TimerCommandDecorator implementation can easily be swapped out with a perfmon version that will increment the performance counter for each message.
Single vs Competing Consumers
Let's run the experiment by observing the throughput difference between the single processor and competing consumer receivers by processing 1000 messages.
The single processor version:
using (var queue = new MessageQueue(QueuePath))
{
queue.Formatter = new BinaryMessageFormatter();
var message = new OrderMessage()
{
Recipients = "[email protected]",
Sender = "[email protected]",
Subject = "Hello World",
Body = emailBody // loaded from a file
};
// Send messages to the queue
for (var i = 0; i < 1000; i++)
{
queue.Send(message);
}
var fileMailer = new FileSmtpMailer(@"C:\temp");
var command = new MailProcessorCommand(fileMailer);
var timerCommand = new TimerCommandDecorator<OrderMessage>(command);
var queueMonitor = new QueueMonitor<OrderMessage>(queue, timerCommand);
queueMonitor.Start();
}
Processing 1000 messages took 47.3004132 sec
The competing consumer processor version with 10 processors:
// Create 10 processors
var commands = new List<ICommand<OrderMessage>>();
for (var i = 0; i < 10; i++)
{
commands.Add(new MailProcessorCommand(fileMailer));
}
var command = new CompositeCompetingCommandProcessor<OrderMessage>(commands);
var timerCommand = new TimerCommandDecorator<OrderMessage>(command);
var queueMonitor = new QueueMonitor<OrderMessage>(queue, timerCommand);
queueMonitor.Start();
Processing 1000 messages took 16.241723 sec
Success! The competing consumer pattern processed the mail queue 3 times faster.
We could experiment using various numbers of concurrent receivers to determine how performance will be affected.
Note: The disk was saturated with request and was running at 100% capacity.
The competing consumer pattern is a great option to fully utilize resources but it may have a negative impact on other tenants of the system. For example, if we call an external API with too many requests, it may appear like a denial-of-service attack.
Summary
The advantage of the competing consumers pattern is the flexibility to configure the number of concurrent consumers to produce a balanced outcome. It is however difficult to determine the throughput and performance impact without instrumentation in place to measure resource utilization.
An approach was covered to add performance instrumentation. The performance metrics revealed that the queue processing throughput can be improved significantly using the competing consumer queue processor compared to the single queue processor.