Yet another Event Aggregator (using Rx)

11.03.2011 00:18Comments
I was really interested in this post from Jose Romaniello about Event aggregators in Rx, which looks like the perfect fit, and Keith Woods has his own version in this other post. So  why am I doing yet another post on Event aggregation using Rx? I want to show my (much) simpler version of it, and how you can find different ways to do the same thing in Rx. Here’s Jose’s definition of an Event aggregator:
Code Snippet
  1. public interface IEventPublisher
  2. {
  3.     void Publish<TEvent>(TEvent sampleEvent);
  4.     IObservable<TEvent> GetEvent<TEvent>();
  5. }
So you can get an IObservable<T> of a certain type of events using the GetEvent method, and you can post events of a certain type using the Publish method. One of the problems with most of the samples that I’ve seen around, is that they don’t handle inheritance very well. So if you wanted to do GetEvent of the Type “ValidationEvent” which is a base class for all validation events, that’s not very well supported because you would need to specify the base class in the Publish method:
Code Snippet
  1. var aggregator = new EventAggregator();
  2. aggregator.GetEvent<ValidationEvent>().Subscribe(x => Console.WriteLine("Validation event: {0}", x.Message));
  3. aggregator.Publish<ValidationEvent>(new InsufficientFounds { User = "John Doe", RequestedAmount = 200 });
  4. Console.ReadKey();
Notice how you specify “ValidationEvent” in the Publish generic parameter, but you pass an “InsufficientFounds”. I wanted to support inheritance without the need to specify the base class, so I thought I’d share my version of an Event aggregator that handles inheritance, which turned out to be even simpler.
Code Snippet
  1. public class EventAggregator : IEventPublisher
  2. {
  3.     private ISubject<object> _subject = new Subject<object>();
  4.     public IObservable<T> GetEvent<T>()
  5.     {
  6.         return _subject.AsObservable().OfType<T>();
  7.     }
  8.  
  9.     public void Publish<TEvent>(TEvent sampleEvent)
  10.     {
  11.         _subject.OnNext(sampleEvent);
  12.     }
  13. }
Notice the use of the “OfType” operator that filters the events based on the desired type. Let’s test this with three different events:
Code Snippet
  1. public class ValidationEvent
  2. {
  3.     public virtual string Message { get; set; }
  4. }
  5.  
  6. public class InsufficientFounds : ValidationEvent
  7. {
  8.     public override string Message
  9.     {
  10.         get
  11.         {
  12.             return string.Format("User {0} does not have enough money to pay {1}", User, RequestedAmount);
  13.         }
  14.         set
  15.         {
  16.             throw new NotImplementedException();
  17.         }
  18.     }
  19.     public string User { get; set; }
  20.     public decimal RequestedAmount { get; set; }
  21. }
  22.  
  23. public class SampleEvent
  24. {
  25.     public string Message { get; set; }
  26. }
And now we can react to all the events derived from ValidationEvent like this:
Code Snippet
  1. [TestMethod]
  2. public void GetEvent_WhenUsingType_PublishesDerivedTypes()
  3. {
  4.     ValidationEvent published = null;
  5.     var aggregator = new EventAggregator();
  6.     aggregator.GetEvent<ValidationEvent>().Subscribe(x => published = x);
  7.     
  8.     aggregator.Publish(new InsufficientFounds { User = "John Doe", RequestedAmount = 200 });
  9.  
  10.     Assert.IsInstanceOfType(published, typeof(InsufficientFounds));
  11. }
So even though you are asking for ValidationEvents, the aggregator raises events of derived types. In the case of events that are not derived, the event will not be raised:
Code Snippet
  1. [TestMethod]
  2. public void GetEvent_WhenUsingType_NoneAssignableTypesAreRaised()
  3. {
  4.     string sampleEvent = "Sample event";
  5.     ValidationEvent published = null;
  6.     var aggregator = new EventAggregator();
  7.     aggregator.GetEvent<ValidationEvent>().Subscribe(x => published = x);
  8.     
  9.     aggregator.Publish(sampleEvent); //non-ValidationEvent
  10.  
  11.     Assert.IsNull(published);
  12. }
This demonstrate the power behind the Reactive Extensions and it’s operators.

comments powered by Disqus