Last week Gian Ntzik gave a great talk at the F#unctional Londoners meetup on the Nessos Streams library. It’s a lightweight F#/C# library for efficient functional-style pipelines on streams of data.
The main difference between LINQ/Seq and Streams is that LINQ is about composing external iterators (Enumerable/Enumerator) and Streams is based on the continuation-passing-style composition of internal iterators, which makes optimisations such as loop fusion easier.
The slides (using FsReveal) and samples are available on Gian’s github repository.
Simple Streams
Gian started the session by live coding a simple implementation of streams in about 20 minutes:
type Stream<'T> = ('T -> unit) -> unit
let inline ofArray (source: 'T[]) : Stream<'T> =
fun k ->
let mutable i = 0
while i < source.Length do
k source.[i]
i <- i + 1
let inline filter (predicate: 'T -> bool) (stream: Stream<'T>) : Stream<'T> =
fun k -> stream (fun value -> if predicate value then k value)
let inline map (mapF: 'T -> 'U) (stream: Stream<'T>) : Stream<'U> =
fun k -> stream (fun v -> k (mapF v))
let inline iter (iterF: 'T -> unit) (stream: Stream<'T>) : unit =
stream (fun v -> iterF v)
let inline toArray (stream: Stream<'T>) : 'T [] =
let acc = new List<'T>()
stream |> iter (fun v -> acc.Add(v))
acc.ToArray()
let inline fold (foldF:'State->'T->'State) (state:'State) (stream:Stream<'T>) =
let acc = ref state
stream (fun v -> acc := foldF !acc v)
!acc
let inline reduce (reducer: ^T -> ^T -> ^T) (stream: Stream< ^T >) : ^T
when ^T : (static member Zero : ^T) =
fold (fun s v -> reducer s v) LanguagePrimitives.GenericZero stream
let inline sum (stream : Stream< ^T>) : ^T
when ^T : (static member Zero : ^T)
and ^T : (static member (+) : ^T * ^T -> ^T) =
fold (+) LanguagePrimitives.GenericZero stream
and as you can see only about 40 lines of code.
Sequential Performance
Just with this simple implementation, Gian was able to demonstrate a significant performance improvement over F#’s built-in Seq module for a simple pipeline:
#time // Turns on timing in F# Interactive
let data = [|1L..1000000L|]
let seqValue =
data
|> Seq.filter (fun x -> x%2L = 0L)
|> Seq.map (fun x -> x * x)
|> Seq.sum
// Real: 00:00:00.252, CPU: 00:00:00.234, GC gen0: 0, gen1: 0, gen2: 0
let streamValue =
data
|> Stream.ofArray
|> Stream.filter (fun x -> x%2L = 0L)
|> Stream.map (fun x -> x * x)
|> Stream.sum
// Real: 00:00:00.119, CPU: 00:00:00.125, GC gen0: 0, gen1: 0, gen2: 0
Note for operations over arrays, the F# Array module would be more appropriate choice and is slightly faster:
let arrayValue =
data
|> Array.filter (fun x -> x%2L = 0L)
|> Array.map (fun x -> x * x)
|> Array.sum
// Real: 00:00:00.094, CPU: 00:00:00.093, GC gen0: 0, gen1: 0, gen2: 0
Also LINQ does quite well here as it has a specialized overloads including one for summing over int64 values:
open System.Linq
let linqValue =
data
.Where(fun x -> x%2L = 0L)
.Select(fun x -> x * x)
.Sum()
// Real: 00:00:00.058, CPU: 00:00:00.062, GC gen0: 0, gen1: 0, gen2: 0
However with F# Interactive running in 64-bit mode Streams take back the advantage (thanks to Nick Palladinos for the tip):
let streamValue =
data
|> Stream.ofArray
|> Stream.filter (fun x -> x%2L = 0L)
|> Stream.map (fun x -> x * x)
|> Stream.sum
// Real: 00:00:00.033, CPU: 00:00:00.031, GC gen0: 0, gen1: 0, gen2: 0
Looks like the 64-bit JIT is doing some black magic there.
Parallel Performance
Switching to the full Nessos Streams library, there’s support for parallel streams via the ParStream module:
let parsStreamValue =
data
|> ParStream.ofArray
|> ParStream.filter (fun x -> x%2L = 0L)
|> ParStream.map (fun x -> x + 1L)
|> ParStream.sum
// Real: 00:00:00.069, CPU: 00:00:00.187, GC gen0: 0, gen1: 0, gen2: 0
which demonstrates a good performance increase with little effort.
For larger computes Nessos Streams supports cloud based parallel operations against Azure.
Overall Nessos Streams looks like a good alternative to the Seq module for functional pipelines.
Nessos LinqOptimzer
For further optimization Gian recommended the Nessos LinqOptimizer:
An automatic query optimizer-compiler for Sequential and Parallel LINQ. LinqOptimizer compiles declarative LINQ queries into fast loop-based imperative code. The compiled code has fewer virtual calls and heap allocations, better data locality and speedups of up to 15x
The benchmarks are impressive:
Reactive Extensions (Rx)
One of the questions in the talk and on twitter later was, given Rx is also a push model, how does the performance compare:
Clearly the Nessos Streams library and Rx have different goals (data processing vs event processing), but I thought it would be interesting to compare them all the same:
open System.Reactive.Linq
let rxValue =
data
.ToObservable()
.Where(fun x -> x%2L = 0L)
.Select(fun x -> x * x)
.Sum()
.ToEnumerable()
|> Seq.head
// Real: 00:00:02.895, CPU: 00:00:02.843, GC gen0: 120, gen1: 0, gen2: 0
let streamValue =
data
|> Stream.ofArray
|> Stream.filter (fun x -> x%2L = 0L)
|> Stream.map (fun x -> x * x)
|> Stream.sum
// Real: 00:00:00.130, CPU: 00:00:00.109, GC gen0: 0, gen1: 0, gen2: 0
In this naive comparison you can see Nessos Streams is roughly 20 times faster than Rx.
Observable module
F# also has a built-in Observable module for operations over IObservable<T> (support for operations over events was added to F# back in 2006). Based on the claims on Rx performance made by Matt Podwysocki I was curious to see how it stacked up:
let obsValue =
data
|> Observable.ofSeq
|> Observable.filter (fun x -> x%2L = 0L)
|> Observable.map (fun x -> x * x)
|> Observable.sum
|> Observable.first
// Real: 00:00:00.479, CPU: 00:00:00.468, GC gen0: 18, gen1: 0, gen2: 0
As you can see Observable module comes off roughly 5 times faster.
Note: I had to add some simple combinators to make this work, you can see the full snippet here: http://fssnip.net/ow
Summary
Nessos Streams look like a promising direction for performance of functional pipelines, and for gaining raw imperative performance the Nessos LINQOptimizer is impressive.