The Reactive Extensions (Rx) provide LINQ style querying capabilities for events in C#, VB.Net and JavaScript. Rx implements extension methods over IObservable<T>, just as LINQ to Objects provides a set of extension methods over IEnumerable<T>,
There has been a fair amount of Microsoft publicity on the Rx library in blogs, videos and talks. Demand for Rx skills in IT jobs has grown over the last 12 months.
That said, the current implementation of the Rx library has at least a couple of issues:
Given that LINQ is based partly on higher-order functions from functional programming perhaps it’s not surprising F# supported querying over events back in 2006. It’s also relatively trivial to expose this functionality to C# by defining compatible extension methods using the ExtensionAttribute e.g.
[<Extension>]
type ObservableExtensions private () =
[<Extension>]
static member Select<'T,'U>(source:IObservable<'T>,selector:Func<'T,'U>) =
source |> Observable.map selector.Invoke
[<Extension>]
static member Where<'T>(source:IObservable<'T>,predicate:Func<'T,bool>) =
source |> Observable.filter predicate.Invoke
[<Extension>]
static member Subscribe<'T>(source:IObservable<'T>, action:Action<'T>) =
source |> Observable.subscribe action.Invoke
This is already enough to provide basic LINQ syntax in C# for types implementing IObservable<T>:
var leftPressedMove =
from e in mouseMove
where e.LeftButton == MouseButtonState.Pressed
select e;
F# custom events implement IObservable<’T> by default and F# provides modules with higher-order functions for both .Net Events and the IObservable<’T> interface.
For C# a mechanism is needed to convert .Net events to an object that implements IObservable<T>. This can be achieved fairly concisely in F# using object expressions:
let FromEvent<'TEventArgs, 'TDelegate when 'TEventArgs:> EventArgs>
(conversion:Func<Action<'TEventArgs>,'TDelegate>,
addHandler:Action<'TDelegate>,
removeHandler:Action<'TDelegate>) =
{ new IObservable<'TEventArgs> with
member this.Subscribe(observer:IObserver<_>) =
let handler = Action<_>(observer.OnNext) |> conversion.Invoke
addHandler.Invoke handler
let remove () = removeHandler.Invoke handler
{ new IDisposable with member this.Dispose() = remove () }
}
Although converting from the event in C# feels a little convoluted:
var mouseMove =
Observable.FromEvent<MouseEventArgs, MouseEventHandler>(
f => new MouseEventHandler((sender, args) => f(args)),
handler => MouseMove += handler,
handler => MouseMove -= handler);
Again for C# a mechanism is required for directly creating objects that can be both a source of events and be used to observe events. Rx follows the Observer pattern and provides a type called Subject that implements both IObserver<T> and IObservable<T>.
Earlier in the year I put up 2 simple types on F# Snippets, that are functionally equivalent to Rx’s Subject<T> and ReplaySubject<T>:
The ReplaySubject implementation uses F# Agents to simplify concurrency..
The types can be used easily from C#:
var s = new Subject<int>();
s.Subscribe(Console.WriteLine);
s.OnNext(1);
For Silverlight and WPF we need a mechanism for invoking methods on the UI thread, which I implemented in F# back in 2010: Implementing IObservable and extending Observable
mouseMove
.Select(e => e.GetPosition(canvas))
.Delay(closure * 100)
.OnDispatcher()
.Subscribe(pos =>
{
Canvas.SetLeft(label, pos.X + closure * 10);
Canvas.SetTop(label, pos.Y);
});
Putting it altogether, the inevitable "time flies like an arrow" Silverlight demo:
In case you’d like to have a play yourself, I’ve put up a preview release on CodePlex:
http://minirx.codeplex.com/