Phillip Trelford's Array

POKE 36879,255

Seq vs Streams

Last week Gian Ntzik gave a great talk at the F#unctional Londoners meetup on the Nessos Streams library. It’s a lightweight F#/C# library for efficient functional-style pipelines on streams of data.

The main difference between LINQ/Seq and Streams is that LINQ is about composing external iterators (Enumerable/Enumerator) and Streams is based on the continuation-passing-style composition of internal iterators, which makes optimisations such as loop fusion easier.

The slides (using FsReveal) and samples are available on Gian’s github repository.

Simple Streams

Gian started the session by live coding a simple implementation of streams in about 20 minutes:

type Stream<'T> = ('T -> unit) -> unit

let inline ofArray (source: 'T[]) : Stream<'T> =
   fun k ->
      let mutable i = 0
      while i < source.Length do
            k source.[i]
            i <- i + 1          

let inline filter (predicate: 'T -> bool) (stream: Stream<'T>) : Stream<'T> =
   fun k -> stream (fun value -> if predicate value then k value)

let inline map (mapF: 'T -> 'U) (stream: Stream<'T>) : Stream<'U> =
   fun k -> stream (fun v -> k (mapF v))

let inline iter (iterF: 'T -> unit) (stream: Stream<'T>) : unit =
   stream (fun v -> iterF v)

let inline toArray (stream: Stream<'T>) : 'T [] =
   let acc = new List<'T>()
   stream |> iter (fun v -> acc.Add(v))
   acc.ToArray()

let inline fold (foldF:'State->'T->'State) (state:'State) (stream:Stream<'T>) =
   let acc = ref state
   stream (fun v -> acc := foldF !acc v)
   !acc

let inline reduce (reducer: ^T -> ^T -> ^T) (stream: Stream< ^T >) : ^T
      when ^T : (static member Zero : ^T) =
   fold (fun s v -> reducer s v) LanguagePrimitives.GenericZero stream

let inline sum (stream : Stream< ^T>) : ^T
      when ^T : (static member Zero : ^T)
      and ^T : (static member (+) : ^T * ^T -> ^T) =
   fold (+) LanguagePrimitives.GenericZero stream

and as you can see only about 40 lines of code.

Sequential Performance

Just with this simple implementation, Gian was able to demonstrate a significant performance improvement over F#’s built-in Seq module for a simple pipeline:

#time // Turns on timing in F# Interactive

let data = [|1L..1000000L|]

let seqValue = 
   data
   |> Seq.filter (fun x -> x%2L = 0L)
   |> Seq.map (fun x -> x * x)
   |> Seq.sum
// Real: 00:00:00.252, CPU: 00:00:00.234, GC gen0: 0, gen1: 0, gen2: 0

let streamValue =
   data
   |> Stream.ofArray
   |> Stream.filter (fun x -> x%2L = 0L)
   |> Stream.map (fun x -> x * x)
   |> Stream.sum
// Real: 00:00:00.119, CPU: 00:00:00.125, GC gen0: 0, gen1: 0, gen2: 0

Note for operations over arrays, the F# Array module would be more appropriate choice and is slightly faster:

let arrayValue =
   data
   |> Array.filter (fun x -> x%2L = 0L)
   |> Array.map (fun x -> x * x)
   |> Array.sum
// Real: 00:00:00.094, CPU: 00:00:00.093, GC gen0: 0, gen1: 0, gen2: 0

Also LINQ does quite well here as it has a specialized overloads including one for summing over int64 values:

open System.Linq

let linqValue =   
   data
      .Where(fun x -> x%2L = 0L)
      .Select(fun x -> x * x)
      .Sum()
// Real: 00:00:00.058, CPU: 00:00:00.062, GC gen0: 0, gen1: 0, gen2: 0

However with F# Interactive running in 64-bit mode Streams take back the advantage (thanks to Nick Palladinos for the tip):

let streamValue =
   data
   |> Stream.ofArray
   |> Stream.filter (fun x -> x%2L = 0L)
   |> Stream.map (fun x -> x * x)
   |> Stream.sum
// Real: 00:00:00.033, CPU: 00:00:00.031, GC gen0: 0, gen1: 0, gen2: 0

Looks like the 64-bit JIT is doing some black magic there.

Parallel Performance

Switching to the full Nessos Streams library, there’s support for parallel streams via the ParStream module:

let parsStreamValue =
   data
   |> ParStream.ofArray
   |> ParStream.filter (fun x -> x%2L = 0L)
   |> ParStream.map (fun x -> x + 1L)
   |> ParStream.sum
// Real: 00:00:00.069, CPU: 00:00:00.187, GC gen0: 0, gen1: 0, gen2: 0

which demonstrates a good performance increase with little effort.

For larger computes Nessos Streams supports cloud based parallel operations against Azure.

Overall Nessos Streams looks like a good alternative to the Seq module for functional pipelines.

Nessos LinqOptimzer

For further optimization Gian recommended the Nessos LinqOptimizer:

An automatic query optimizer-compiler for Sequential and Parallel LINQ. LinqOptimizer compiles declarative LINQ queries into fast loop-based imperative code. The compiled code has fewer virtual calls and heap allocations, better data locality and speedups of up to 15x

The benchmarks are impressive:

cs-ssq

Reactive Extensions (Rx)

One of the questions in the talk and on twitter later was, given Rx is also a push model, how does the performance compare:


Clearly the Nessos Streams library and Rx have different goals (data processing vs event processing), but I thought it would be interesting to compare them all the same:

open System.Reactive.Linq

let rxValue =
   data
      .ToObservable()
      .Where(fun x -> x%2L = 0L)
      .Select(fun x -> x * x)
      .Sum()      
      .ToEnumerable()
      |> Seq.head
// Real: 00:00:02.895, CPU: 00:00:02.843, GC gen0: 120, gen1: 0, gen2: 0

let streamValue =
   data
   |> Stream.ofArray
   |> Stream.filter (fun x -> x%2L = 0L)
   |> Stream.map (fun x -> x * x)
   |> Stream.sum
// Real: 00:00:00.130, CPU: 00:00:00.109, GC gen0: 0, gen1: 0, gen2: 0

In this naive comparison you can see Nessos Streams is roughly 20 times faster than Rx.

Observable module

F# also has a built-in Observable module for operations over IObservable<T> (support for operations over events was added to F# back in 2006). Based on the claims on Rx performance made by Matt Podwysocki I was curious to see how it stacked up:

let obsValue =
   data
   |> Observable.ofSeq
   |> Observable.filter (fun x -> x%2L = 0L)
   |> Observable.map (fun x -> x * x)
   |> Observable.sum
   |> Observable.first
// Real: 00:00:00.479, CPU: 00:00:00.468, GC gen0: 18, gen1: 0, gen2: 0

As you can see Observable module comes off roughly 5 times faster.

Note: I had to add some simple combinators to make this work, you can see the full snippet here: http://fssnip.net/ow

Summary

Nessos Streams look like a promising direction for performance of functional pipelines, and for gaining raw imperative performance the Nessos LINQOptimizer is impressive.

Comments (2) -

  • Art Scott

    11/18/2014 5:53:42 PM |

    Hi Phil.
    What was the hardware/system used for measures?

    Thanks, Art.

  • Phil

    11/19/2014 12:26:54 AM |

    Hi Art,

    I was using a MBP with an 2.4GHz Haswell i5 & 8GB RAM, and running F# interactive inside VS2013 with default settings. Since writing the original cut of the article I've noticed that switching on 64-bit support gives a huge performance increase for Streams (thanks to Nick Palladinos for the tip).

    Cheers,
    Phil

Comments are closed