Events Part 3 On steroids using TPL Dataflow

Events

For last couple of days I have been exploring TPL Dataflow in seeing how it can reduce complexity of parallel processing by alleviating callbacks and synchronizations such as locks and sharing data. And I am loving it. I am still exploring this library and the resources and documentations on this seem a bit scarce as of now. I feel there are not enough developers using it as they are not aware or the design pattern is unfamiliar.

How can we use Dataflow to handle events?

There was a particular feature which caught my attention since my last 2 articles were based on events, I set out to implement a solution to handle events by using Dataflow. For those of you who are exploring DDD, I recommend taking a look at Domain Events which helps you keep the domain completely encapsulated. I like the idea here of exposing events as an object instead of a delegate. The immediate benefit I see is that the domain class no longer needs to hold a reference to the handler and the handlers need not explicitly unsubscribe from the events. These were few of the drawback which I had stated earlier by using delegate based event and this addresses them nicely.

TPL Dataflow is built on top of Task Parallel Library and so with it brings all the familiar support for tasks and asynchronous programming. It is not distributed with the .NET framework and needs to be installed through NuGet. Look for Microsoft.Tpl.Dataflow and you should have System.Threading.Tasks.Dataflow.dll when installed.

Quick Intro

To get started, Dataflow provides us blocks with pipes through which data can be processed or just flow. A block can either receive (ITargetBlock<T>) or offer data (ISourceBlock<T>) or do both when combined. The data can be processed by the block or simply transferred. You can push data to a block (Post(input)) and get output from it (Receive). You can connect two or more blocks together by linking them together (LinkTo(ITargetBlock)). That’s where the analogy of pipes comes in. Once you have linked the blocks and send data to the top most block, you need not worry about moving data. The linking would take care of propagating data from one block to another. You can instruct a block to stop accepting input anytime.
There are different blocks readily made available and one could create custom blocks too. You can find more information from Microsoft’s Introduction to TPL Dataflow.

In this article I’ll hit the ground running with Dataflow by using it to handle my domain events. I would be building this on top of my earlier simple domain example used here and here. I would be borrowing the idea of domain event here and represent it explicitly through an instance.

Domain Event Interface

1
2
3
4
public interface IDomainEvent
{
DateTime EventOccurred { get; }
}

Define my event for product added.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ProductAddedEvent : IDomainEvent
{
public DateTime EventOccurred
{
get; private set;
}

public Product Product { get; private set; }

public ProductAddedEvent(Product product)
{
EventOccurred = DateTime.Now;
Product = product;
}
}

The event would now be raised by the domain object Product

1
2
3
4
5
6
7
8
9
10
11
public Product(IBroadcaster mediator)
{
_mediator = mediator;
}

public void Add(string name, int quantity)
{
Name = name;
Quantity = quantity;
_mediator.Post(new ProductAddedEvent(this));
}

As you can see, the domain no longer raises an event, but sends the change in state through an object instance of ProductAddedEvent. Also there are no static calls here to raise the event, but through an injected instance of IBroadcaster.

1
2
3
4
public interface IBroadcaster
{
void Post<T>(T args) where T : IDomainEvent;
}

All those interested in subscribing to any type of IDomainEvent would do through a concrete instance of ISubscriber.

1
2
3
4
5
6
7
public interface ISubscriber
{
IDisposable Subscribe<T>(Action<T> action)
where T : IDomainEvent;

void UnSubscribe(IDisposable obj);
}

Ok. So how do we link all this up and where does Dataflow fit in? Under Dataflow we have a BroadcastBlock<T> concrete implementation. Its sole mission is to send a copy of every message published to all linked targets. Optionally you can even issue a cloning function Func<T,T> on how that copy would be offered to the targets.

We implement both the interfaces here to manage subscription and posting of messages.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class EventMediator : ISubscriber, IBroadcaster
{
private readonly BroadcastBlock<IDomainEvent> broadcast =
new BroadcastBlock<IDomainEvent>(args => args);

public void Post<T>(T args)
where T : IDomainEvent
{
broadcast.Post(args);
}

public IDisposable Subscribe<T>(Action<T> action)
where T : IDomainEvent
{
var handler = new ActionBlock<IDomainEvent>(
args => action((T)args));

return broadcast.LinkTo(handler,
e => e.GetType() == typeof(T));
}

public void UnSubscribe(IDisposable obj)
{
obj.Dispose();
}
}

Event handlers can subscribe to the events they are interested in. The message would be passed only to those handlers which satisfy the filter condition.
Another beauty is anytime a handler can unlink by calling UnSubscribe which just disposes off the IDisposable object returned by the LinkTo call. This would internally unlink and stop sending future messages to that handler.

That’s it. Now we write the handler and subscribe.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class CheckStock : IDisposable
{
private IDisposable _subscription;
public CheckStock(ISubscriber subscriber)
{
_subscription = subscriber.Subscribe<ProductAddedEvent>
(m => Handler(m));
}

private void Handler(ProductAddedEvent args)
{
Console.WriteLine("Received by CheckStock,
event at {0}", args.EventOccurred);
Console.WriteLine("Checking stock for {0} ",
args.Product.Name);
}

public void Dispose()
{
_subscription.Dispose();
}
}

Stitching them up together below we can see them in action.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var mediator = new EventMediator();
// subscribers
var stockChecker = new CheckStock(mediator);
var notify = new SendNotification(mediator);

// add an inline handler
var adHoc = mediator.Subscribe<ProductAddedEvent>(
(m) =>
{ Console.WriteLine("This is an inline process for {0}",
m.Product.Name); });

var product = new Product(mediator);
product.Add("Cutting edge", 10);

//notify.Dispose();
// Unsubscribe one of them
mediator.UnSubscribe(adHoc);
product = new Product(mediator);
product.Add("Bleeding edge", 10);

What we now have is completely unit testable and injectable. Those adhering to DDD would have a fully encapsulated domain. Since the Subscribe takes in an Action<T>, send any action which you can assert when the event is raised.

If there are many events in a domain, we can define new type implementing IDomainEvent

1
public class ProductDeletedEvent : IDomainEvent

Assign a handler for the new event and subscribe

1
mediator.Subscribe<ProductDeletedEvent>(m => Handler(m));

The above handlers would run as concurrent operations. You can send message to as many handlers as required while still being able to maintain throughput. Dataflow allows you to throttle the processing or buffer your messages. Like we have multiple consumers registered with EventMediator, we can have multiple producers sending data through the same instance of EventMediator.