Software Design Blog - Composite PatternSimple solutions to solve complex problems
/
http://www.rssboard.org/rss-specificationBlogEngine.NET 3.1.1.0en-US/opml.axdhttp://www.dotnetblogengine.net/syndication.axdJay StrydomSoftware Design Blog0.0000000.000000Broadcasting messages using the composite pattern<img src="/pics/banners/MessagePatternsBlog.jpg" class="img-responsive" alt="Broadcasting Messages">
<br>
<p>The <a href="/post/message-queue-delivery-strategies">previous post</a> focused on reading messages from a queue and pushing the messages to a single command for processing.</p>
<p><b>The problem</b> is how can we notify multiple commands to process the same request? For example, when an order is received, we may want a command to 1) send an email, 2) add an audit record and 3) call an external API.</p>
<p><b>The solution</b> is to use the composite pattern, whereby a group of commands are treated in a similar way as a single command to fan out the messages.</p>
<a href="/Downloads/MessageQueueExampleBroadcasting.zip" role="button" class="btn btn-primary btn-sm">Download Source Code</a>
<h3>Setup</h3>
<p>The core components such as the Queue Monitor were covered in the <a href="/post/message-queue-delivery-strategies">Message Queue Delivery Strategies</a> post.</p>
<h3>Sequential Processing</h3>
<p>The CompositeSequentialBroadcastCommand class below implements ICommand to indicate that it exhibits the behaviour of a command. The responsibility of the composite class is to loop though a collection of commands and call each command with the same request.</p>
<pre class="brush: c-sharp;">
public interface ICommand<in T>
{
void Execute(T message);
}
public class OrderProcessorCommand : ICommand<OrderModel>
{
private readonly int _id;
public OrderProcessorCommand(int id)
{
_id = id;
}
public void Execute(OrderModel message)
{
if (message == null) throw new ArgumentNullException("message");
var start = DateTime.Now;
Thread.Sleep(TimeSpan.FromSeconds(2));
Console.WriteLine("Processed {0} by worker {1} from {2} to {3}",
message.Name, _id, start.ToString("h:mm:ss"),
DateTime.Now.ToString("h:mm:ss"));
}
}
public class CompositeSequentialBroadcastCommand<T> : ICommand<T>
{
private readonly IEnumerable<ICommand<T>> _commands;
public CompositeSequentialBroadcastCommand(IEnumerable<ICommand<T>> commands)
{
if (commands == null) throw new ArgumentNullException("commands");
_commands = commands.ToList();
}
public void Execute(T message)
{
foreach (var command in _commands)
{
command.Execute(message);
}
}
}
</pre>
Let's see what happens when 3 sequential workers are configured to process 2 messages.
<pre class="brush: c-sharp;">
using (var queue = new MessageQueue(@".\Private$\Orders"))
{
queue.Formatter = new BinaryMessageFormatter();
var workers = new List<ICommand<OrderModel>>();
for (var workerId = 1; workerId <= 3; workerId++)
{
workers.Add(new OrderProcessorCommand(workerId));
}
var command = new CompositeSequentialBroadcastCommand<OrderModel>(workers);
var queueMonitor = new QueueMonitor<OrderModel>(queue, command);
queueMonitor.Start();
}
</pre>
<pre>Processed Order 1 by worker 1 from 6:26:34 to 6:26:36
Processed Order 1 by worker 2 from 6:26:36 to 6:26:38
Processed Order 1 by worker 3 from 6:26:38 to 6:26:40
Processed Order 2 by worker 1 from 6:26:40 to 6:26:42
Processed Order 2 by worker 2 from 6:26:42 to 6:26:44
Processed Order 2 by worker 3 from 6:26:44 to 6:26:46
</pre>
<p>The output above shows that each worker processed the same message in sequence. If worker 2 failed then worker 3 would not be executed. This can be a great pattern when the sequence is important. For example, only send the welcome email if the order has been created successfully.</p>
<h3>Parallel Processing</h3>
<p>The CompositeParallelBroadcastCommand class is similar to the previous example. The only difference is that this class will call the commands in parallel.</p>
<pre class="brush: c-sharp;">
class CompositeParallelBroadcastCommand<T> : ICommand<T>
{
private readonly IEnumerable<ICommand<T>> _commands;
public CompositeParallelBroadcastCommand(IEnumerable<ICommand<T>> commands)
{
if (commands == null) throw new ArgumentNullException("commands");
_commands = commands.ToList();
}
public void Execute(T message)
{
Parallel.ForEach(_commands, c => c.Execute(message));
}
}
</pre>
Let's see what happens when parallel 3 workers are configured to process 2 messages.
<pre class="brush: c-sharp;">
using (var queue = new MessageQueue(@".\Private$\Orders"))
{
queue.Formatter = new BinaryMessageFormatter();
var workers = new List<ICommand<OrderModel>>();
for (var workerId = 1; workerId <= 3; workerId++)
{
workers.Add(new OrderProcessorCommand(workerId));
}
var command = new CompositeParallelBroadcastCommand<OrderModel>(workers);
var queueMonitor = new QueueMonitor<OrderModel>(queue, command);
queueMonitor.Start();
}
</pre>
<pre>Processed Order 1 by worker 2 from 6:08:36 to 6:08:38
Processed Order 1 by worker 1 from 6:08:36 to 6:08:38
Processed Order 1 by worker 3 from 6:08:36 to 6:08:38
Processed Order 2 by worker 3 from 6:08:38 to 6:08:40
Processed Order 2 by worker 2 from 6:08:38 to 6:08:40
Processed Order 2 by worker 1 from 6:08:38 to 6:08:40
</pre>
<p>The output above shows that each worker processed the same message concurrently. This can be a great pattern when sequencing doesn�t matters. For example, send a welcome email and add an audit record at the same time.</p>
<h3>Summary</h3>
<p>This post focused on broadcasting a single message to multiple receivers. The composite pattern allows the behaviour of the Queue Monitor push to change without modifying the code.</p>
<b>Coming soon:</b> The next post will focus on processing a single request by dispatching messaging to a pool of competing consumer processors.
/post/broadcasting-messages-using-the-composite-pattern
[email protected]/post/broadcasting-messages-using-the-composite-pattern#comment/post.aspx?id=46b8ba45-9ca2-4e9d-b42d-b00d553f705fMon, 21 Dec 2015 23:59:00 +1300Command PatternComposite PatternQueuesCommand PatternComposite PatternQueuesDesign PatternsJay Strydom/pingback.axd/post.aspx?id=46b8ba45-9ca2-4e9d-b42d-b00d553f705f0/trackback.axd?id=46b8ba45-9ca2-4e9d-b42d-b00d553f705f/post/broadcasting-messages-using-the-composite-pattern#comment/syndication.axd?post=46b8ba45-9ca2-4e9d-b42d-b00d553f705f