Software Design Blog - Composite Pattern Simple solutions to solve complex problems / http://www.rssboard.org/rss-specification BlogEngine.NET 3.1.1.0 en-US /opml.axd http://www.dotnetblogengine.net/syndication.axd Jay Strydom Software Design Blog 0.000000 0.000000 Broadcasting messages using the composite pattern <img src="/pics/banners/MessagePatternsBlog.jpg" class="img-responsive" alt="Broadcasting Messages"> <br> <p>The <a href="/post/message-queue-delivery-strategies">previous post</a> focused on reading messages from a queue and pushing the messages to a single command for processing.</p> <p><b>The problem</b> is how can we notify multiple commands to process the same request? For example, when an order is received, we may want a command to 1) send an email, 2) add an audit record and 3) call an external API.</p> <p><b>The solution</b> is to use the composite pattern, whereby a group of commands are treated in a similar way as a single command to fan out the messages.</p> <a href="/Downloads/MessageQueueExampleBroadcasting.zip" role="button" class="btn btn-primary btn-sm">Download Source Code</a> <h3>Setup</h3> <p>The core components such as the Queue Monitor were covered in the <a href="/post/message-queue-delivery-strategies">Message Queue Delivery Strategies</a> post.</p> <h3>Sequential Processing</h3> <p>The CompositeSequentialBroadcastCommand class below implements ICommand to indicate that it exhibits the behaviour of a command. The responsibility of the composite class is to loop though a collection of commands and call each command with the same request.</p> <pre class="brush: c-sharp;"> public interface ICommand&lt;in T&gt; { void Execute(T message); } public class OrderProcessorCommand : ICommand&lt;OrderModel&gt; { private readonly int _id; public OrderProcessorCommand(int id) { _id = id; } public void Execute(OrderModel message) { if (message == null) throw new ArgumentNullException("message"); var start = DateTime.Now; Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine("Processed {0} by worker {1} from {2} to {3}", message.Name, _id, start.ToString("h:mm:ss"), DateTime.Now.ToString("h:mm:ss")); } } public class CompositeSequentialBroadcastCommand&lt;T&gt; : ICommand&lt;T&gt; { private readonly IEnumerable&lt;ICommand&lt;T&gt;&gt; _commands; public CompositeSequentialBroadcastCommand(IEnumerable&lt;ICommand&lt;T&gt;&gt; commands) { if (commands == null) throw new ArgumentNullException("commands"); _commands = commands.ToList(); } public void Execute(T message) { foreach (var command in _commands) { command.Execute(message); } } } </pre> Let's see what happens when 3 sequential workers are configured to process 2 messages. <pre class="brush: c-sharp;"> using (var queue = new MessageQueue(@".\Private$\Orders")) { queue.Formatter = new BinaryMessageFormatter(); var workers = new List&lt;ICommand&lt;OrderModel&gt;&gt;(); for (var workerId = 1; workerId &lt;= 3; workerId++) { workers.Add(new OrderProcessorCommand(workerId)); } var command = new CompositeSequentialBroadcastCommand&lt;OrderModel&gt;(workers); var queueMonitor = new QueueMonitor&lt;OrderModel&gt;(queue, command); queueMonitor.Start(); } </pre> <pre>Processed Order 1 by worker 1 from 6:26:34 to 6:26:36 Processed Order 1 by worker 2 from 6:26:36 to 6:26:38 Processed Order 1 by worker 3 from 6:26:38 to 6:26:40 Processed Order 2 by worker 1 from 6:26:40 to 6:26:42 Processed Order 2 by worker 2 from 6:26:42 to 6:26:44 Processed Order 2 by worker 3 from 6:26:44 to 6:26:46 </pre> <p>The output above shows that each worker processed the same message in sequence. If worker 2 failed then worker 3 would not be executed. This can be a great pattern when the sequence is important. For example, only send the welcome email if the order has been created successfully.</p> <h3>Parallel Processing</h3> <p>The CompositeParallelBroadcastCommand class is similar to the previous example. The only difference is that this class will call the commands in parallel.</p> <pre class="brush: c-sharp;"> class CompositeParallelBroadcastCommand&lt;T&gt; : ICommand&lt;T&gt; { private readonly IEnumerable&lt;ICommand&lt;T&gt;&gt; _commands; public CompositeParallelBroadcastCommand(IEnumerable&lt;ICommand&lt;T&gt;&gt; commands) { if (commands == null) throw new ArgumentNullException("commands"); _commands = commands.ToList(); } public void Execute(T message) { Parallel.ForEach(_commands, c =&gt; c.Execute(message)); } } </pre> Let's see what happens when parallel 3 workers are configured to process 2 messages. <pre class="brush: c-sharp;"> using (var queue = new MessageQueue(@".\Private$\Orders")) { queue.Formatter = new BinaryMessageFormatter(); var workers = new List&lt;ICommand&lt;OrderModel&gt;&gt;(); for (var workerId = 1; workerId &lt;= 3; workerId++) { workers.Add(new OrderProcessorCommand(workerId)); } var command = new CompositeParallelBroadcastCommand&lt;OrderModel&gt;(workers); var queueMonitor = new QueueMonitor&lt;OrderModel&gt;(queue, command); queueMonitor.Start(); } </pre> <pre>Processed Order 1 by worker 2 from 6:08:36 to 6:08:38 Processed Order 1 by worker 1 from 6:08:36 to 6:08:38 Processed Order 1 by worker 3 from 6:08:36 to 6:08:38 Processed Order 2 by worker 3 from 6:08:38 to 6:08:40 Processed Order 2 by worker 2 from 6:08:38 to 6:08:40 Processed Order 2 by worker 1 from 6:08:38 to 6:08:40 </pre> <p>The output above shows that each worker processed the same message concurrently. This can be a great pattern when sequencing doesn�t matters. For example, send a welcome email and add an audit record at the same time.</p> <h3>Summary</h3> <p>This post focused on broadcasting a single message to multiple receivers. The composite pattern allows the behaviour of the Queue Monitor push to change without modifying the code.</p> <b>Coming soon:</b> The next post will focus on processing a single request by dispatching messaging to a pool of competing consumer processors. /post/broadcasting-messages-using-the-composite-pattern [email protected] /post/broadcasting-messages-using-the-composite-pattern#comment /post.aspx?id=46b8ba45-9ca2-4e9d-b42d-b00d553f705f Mon, 21 Dec 2015 23:59:00 +1300 Command Pattern Composite Pattern Queues Command Pattern Composite Pattern Queues Design Patterns Jay Strydom /pingback.axd /post.aspx?id=46b8ba45-9ca2-4e9d-b42d-b00d553f705f 0 /trackback.axd?id=46b8ba45-9ca2-4e9d-b42d-b00d553f705f /post/broadcasting-messages-using-the-composite-pattern#comment /syndication.axd?post=46b8ba45-9ca2-4e9d-b42d-b00d553f705f