Phillip Trelford's Array

POKE 36879,255

Mini Rx: Observable Extensions

The Reactive Extensions (Rx) provide LINQ style querying capabilities for events in C#, VB.Net and JavaScript. Rx implements extension methods over IObservable<T>, just as LINQ to Objects provides a set of extension methods over IEnumerable<T>,


There has been a fair amount of Microsoft publicity on the Rx library in blogs, videos and talks. Demand for Rx skills in IT jobs has grown over the last 12 months. 

Reactive Extensions Demand Trend

That said, the current implementation of the Rx library has at least a couple of issues:

Given that LINQ is based partly on higher-order functions from functional programming perhaps it’s not surprising F# supported querying over events back in 2006. It’s also relatively trivial to expose this functionality to C# by defining compatible extension methods using the ExtensionAttribute e.g.

type ObservableExtensions private () =
   static member Select<'T,'U>(source:IObservable<'T>,selector:Func<'T,'U>) =
       source |> selector.Invoke
   static member Where<'T>(source:IObservable<'T>,predicate:Func<'T,bool>) =
       source |> Observable.filter predicate.Invoke
   static member Subscribe<'T>(source:IObservable<'T>, action:Action<'T>) =
       source |> Observable.subscribe action.Invoke

This is already enough to provide basic LINQ syntax in C# for types implementing IObservable<T>:

var leftPressedMove =
    from e in mouseMove
    where e.LeftButton == MouseButtonState.Pressed
    select e;

F# custom events implement IObservable<’T> by default and F# provides modules with higher-order functions for both .Net Events and the IObservable<’T> interface.

For C# a mechanism is needed to convert .Net events to an object that implements IObservable<T>. This can be achieved fairly concisely in F# using object expressions:

let FromEvent<'TEventArgs, 'TDelegate when 'TEventArgs:> EventArgs>
            removeHandler:Action<'TDelegate>)  =
    { new IObservable<'TEventArgs> with
        member this.Subscribe(observer:IObserver<_>) =
            let handler = Action<_>(observer.OnNext) |> conversion.Invoke
            addHandler.Invoke handler
            let remove () = removeHandler.Invoke handler
            { new IDisposable with member this.Dispose() = remove () }

Although converting from the event in C# feels a little convoluted:

var mouseMove =
    Observable.FromEvent<MouseEventArgs, MouseEventHandler>(
        f => new MouseEventHandler((sender, args) => f(args)),
        handler => MouseMove += handler,
        handler => MouseMove -= handler);

Again for C# a mechanism is required for directly creating objects that can be both a source of events and be used to observe events. Rx follows the Observer pattern and provides a type called Subject that implements both IObserver<T> and IObservable<T>.

Earlier in the year I put up 2 simple types on F# Snippets, that are functionally equivalent to Rx’s Subject<T> and ReplaySubject<T>:

The ReplaySubject implementation uses F# Agents to simplify concurrency..

The types can be used easily from C#:

var s = new Subject<int>();

For Silverlight and WPF we need a mechanism for invoking methods on the UI thread, which I implemented in F# back in 2010: Implementing IObservable and extending Observable 

    .Select(e => e.GetPosition(canvas))
    .Delay(closure * 100)
    .Subscribe(pos =>
        Canvas.SetLeft(label, pos.X + closure * 10);
        Canvas.SetTop(label, pos.Y);

Putting it altogether, the inevitable "time flies like an arrow" Silverlight demo:

In case you’d like to have a play yourself, I’ve put up a preview release on CodePlex:


Playable Silverlight mini-game implemented with a couple of hundred lines of F#!

Keys: Left Arrow = Left, Right Arrow = Right, Up Arrow = Rotate, Down Arrow = Drop


Can also be run as an editable script inside

The main game loop:

let rec gameLoop () =  async {
    do! prompt "Click To Start" awaitingClick                                    
    do! inGameLoop ()         
    do! prompt "Game Over" paused        
    return! gameLoop ()

The game makes use of F#’s Asynchronous Workflows to simplify the asynchronous flow. The game loop above looks as if it is run synchronously. But by simply wrapping the code in an async block it can be run asynchronously!


DDD Scotland

This weekend I made my first ever trip up to Glasgow to attend and present at the DDD Scotland conference, an event run by the community with the sessions voted for by the community.

I arrived on Friday afternoon in time for a quick walk around Glasgow’s rather impressive city centre before the speaker’s dinner. It appears I may not have been the only visitor from out-of-town on the day ;)

Police Box

The event

I started the morning with Ray Booysen’s excellent talk: Streams of Streams – Your Rx Prescription. Ray gave a great introduction to the library with plenty of live examples inside LINQPad. The Reactive Extensions (Rx) is basically LINQ for events.

Then shared the speaker’s room with Gary Short’s Ego before my talk on BDD with F#:

Code samples:

The event itself was not recorded but the following videos present some similar material:

My afternoon started with Paul Stack’s session SpecFlow - functional testing made easy. Paul gave a great intro to automating acceptance test for web apps using Specflow and Watir.

Then some important messages in Richard Dalton’s excellent session: How to build a framework, and why you almost never should. A quote from Richard’s slides:

Base the Framework on existing code, preferably multiple cases. Before Reuse… there must BE USE.

Thanks to all the organizers, speakers and attendees for making this a really fun weekend.