Phillip Trelford's Array

POKE 36879,255

What’s happening? Loosely coupled events

So the Reactive Extensions (Rx) demonstrate some sweet syntax for handling CLR events. How about doing the same thing for loosely coupled events exposed through an Event Broker like Prism’s IEventAggregator. For example if you wanted to publish an event from any window that could be observed by any other window without them all having intimate knowledge of each other.

The following is a simple F# event broker coded on the train this morning (Note: the interface ISubscribe<’a> could be replaced by IObservable<’a>):

type ITrigger<'a> =
    abstract member Trigger : 'a -> unit
    
type ISubscribe<'a> =     
    abstract member Subscribe : ('a -> unit) -> unit 

type IHappen<'a> = 
    inherit ITrigger<'a> 
    inherit ISubscribe<'a>
            
type IHappenings =
    abstract member ObtainHappening<'a> : unit -> IHappen<'a>
        
type Happenings () =
    let happenings = System.Collections.Generic.Dictionary<System.Type,_>()
    interface IHappenings with
        member this.ObtainHappening<'a> () =
            let CreateHappening () = 
                let subscribers = ref []
                { new IHappen<'a> with                        
                    member this.Subscribe f = 
                        lock(this) (fun _ -> 
                            subscribers := f::!subscribers)
                    member this.Trigger (x) =                      
                        !subscribers |> List.iter (fun f -> f x)                       
                }
            lock (this) (fun _ ->    
                match happenings.TryGetValue(typeof<'a>) with
                | true, happen -> unbox happen
                | false, _ ->                 
                    let happen = CreateHappening ()
                    happenings.Add(typeof<'a>,box happen)
                    happen
            )                    
    end

 

Then, in Starbucks for a coffee and some higher-order functions :

module Happening =   
    open System
    open System.Windows.Threading

    let Subscribe<'a> f (happening:ISubscribe<'a>) =         
        happening.Subscribe f
    let OnDispatcher<'a> (happening:ISubscribe<'a>) =
        let dispatcher = Dispatcher.CurrentDispatcher
        { new ISubscribe<'a> with  
            member this.Subscribe f = 
                happening.Subscribe (fun x -> 
                dispatcher.Invoke(Action(fun _ -> f x)) |> ignore
            )            
        }        
    let Filter<'a> filterF (happening:ISubscribe<'a>) =
        { new ISubscribe<'a> with  
            member this.Subscribe f = 
                happening.Subscribe (fun x -> if filterF x then f x)          
        }
    let Map<'a,'b> mapF (happening:ISubscribe<'a>) =        
        { new ISubscribe<'b> with  
            member this.Subscribe f = 
                happening.Subscribe (fun x -> let y = mapF x in f y)            
        }
    let Delay<'a> milliseconds (happening:ISubscribe<'a>) =
        { new ISubscribe<'a> with  
            member this.Subscribe f = 
                happening.Subscribe (fun x -> 
                    async { 
                        do! Async.Sleep(milliseconds) 
                        f x 
                    } |> Async.Start )            
        }

 

Finally, here is Matthew Podwysocki’s cool Time Flies Like An Arrow sample rewritten to use more loosely-coupled events:

module Test =  
    open System
    open System.Windows
    open System.Windows.Controls
    open System.Windows.Input
    open System.Windows.Media
 
    let getPosition (element : #UIElement) (args : MouseEventArgs) =
        let point = args.GetPosition(element)
        (point.X, point.Y)    
     
    type TimeFliesWindow(happenings:IHappenings) as this =
        inherit Window()

        do this.Title <- "Time files like an arrow"

        let canvas = 
           Canvas(Width=800.0, Height=400.0, Background = Brushes.White) 
        do this.Content <- canvas

        let happen = happenings.ObtainHappening<MouseEventArgs>()
        do this.MouseMove         
            |> Observable.subscribe happen.Trigger |> ignore

        do "F# can react to first class events!"
            |> Seq.iteri(fun i c ->  
                let s = TextBlock(Width=20.0, 
                                Height=30.0, 
                                FontSize=20.0, 
                                Text=string c, 
                                Foreground=Brushes.Black, 
                                Background=Brushes.White)
                canvas.Children.Add(s) |> ignore              
                happen                  
                |> Happening.Map (getPosition canvas)           
                |> Happening.Delay (i * 100)
                |> Happening.OnDispatcher                            
                |> Happening.Subscribe (fun (x, y) ->                                                        
                     Canvas.SetTop(s, y) 
                     Canvas.SetLeft(s, x + float ( i * 10)))              
           )        
              
    let happenings = new Happenings()
    let win = TimeFliesWindow(happenings)
    [<STAThread>]
    do (new Application()).Run(win) |> ignore

F# Agents and Retlang: quick comparison

Erlang style message passing is a great way of simplifying concurrency. The open source Retlang library aims to bring some of the benefits to .Net languages like C# and VB.Net. Whereas F# has built-in message passing support with the MailboxProcessor also referred to as Agents. To compare Retlang against F# Agents I have coded up the Retlang Summation example (171 lines) as an equivalent F# implementation (41 lines):

type Agent<'a> = MailboxProcessor<'a>

type Summation =
    | Add of int
    | Total of AsyncReplyChannel<int>    
and SummationAgent () =
    let agent = Agent.Start ( fun inbox ->    
        let rec loop total =
            async {
            let! message = inbox.Receive()
            match message with 
            | Add n -> do! loop (n + total)
            | Total reply -> reply.Reply(total)
            }
        loop 0
    )    
    /// Adds value to total
    member this.Add n = Add(n) |> agent.Post
    /// Returns total and ends computation
    member this.Total () = (fun reply -> Total(reply)) |> agent.PostAndReply

/// Invokes specified function with numbers from 1 to limit
let numberSource f limit =
    async {
        for i = 1 to limit do
            f i            
            if i % 10 = 0 then System.Console.WriteLine("{0}\t({1})",i,limit)        
    }

do  /// Summation agent instance
    let agent = SummationAgent ()    
    // Post series of numbers to summation agent in parallel
    [100;50;200]
    |> Seq.map (numberSource agent.Add)
    |> Async.Parallel
    |> Async.RunSynchronously
    |> ignore    
    // Get total
    let value = agent.Total ()
    System.Diagnostics.Debug.Assert(26425 = value);
    value |> System.Console.WriteLine

 

If you compare this to the Retlang example implementation, it should be clear that C# is missing the easy message definition (discriminated unions) and pattern matching part of message passing. You may also notice that in the F# version, the functionality is accomplished, not only without locks, but also without any low-level threading primitives (like AutoResetEvent). However, if you find yourself constrained to C# development then Retlang is definitely worth a look.

The Associative Model of Data

Since the 1980s the 8086 architecture has dominated micro-processors and so too has the relational model. The x86 series has papered over the cracks with larger and larger chips adding huge caches and requiring smarter compilers, with the relational model seeing ever larger RDBMSs systems and ORMs.

Even with an ORM like Hibernate in place, to create a working data driven solution is cumbersome. We must define a database schema, along the way explicitly defining the bits and bytes of parent/child relationships, then an XML mapping file and finally plain old objects. As new features are added all of these definitions must be kept in synch.

For say a basic web store we may only require a few tables, say products, categories, orders and customers. But what if you wanted to extend the web store to have features like the online retailer Amazon, e.g. multiple sellers, recommendations, etc.?

Answer: serious table and relationship proliferation.

Enter an alternative model: the Associative model of data, a dynamic model where data is defined simply as items and links:

/// Associative data value    
type Value =
    /// Item value
    | Item of string
    /// Link of source value, verb and target value
    | Link of Value * string * Value

 

The following is a minimal implementation of an Associative repository using F#:

/// Naive Associative Model implementation
type Repository () =    
    let mutable items = []
    let mutable links = []  
    let invalidOp s = new System.InvalidOperationException(s) |> raise                         
    let obtainItem value =
        let valueOf = function Item v -> v | Link _ -> invalidOp ""
        match items |> List.tryFind (valueOf >> (=) value) with
        | Some item -> item
        | None ->
            let item = Item(value)
            items <- item :: items
            item 
    let createLink (source,verb,target) =
        let link = Link(source,verb,target)
        links <- link :: links
        link    
    let matchLink f = function 
        | Link(s,v,t) as link -> f (s,v,t)
        | Item _ -> invalidOp ""         
    let filterLinks f = links |> List.filter (matchLink f)                            
    let chooseLinks f = links |> List.choose (matchLink f)
    let pickLink f = links |> List.pick (matchLink f)         
    let rec toString = function   
        | Item value -> value
        | Link (s,v,t) -> toString s + " " + v + " " + toString t
    let rec createEntity (source:Value) =        
        { new IEntity with
            member this.Add (verb,target) = 
                createEntity(createLink(source,verb,obtainItem target))            
            member this.Value verb =
                fun (s,v,t) -> if s = source && v = verb then Some(t) else None                
                |> pickLink |> createEntity           
            member this.Links verb =
                filterLinks (fun (s,v,t) -> s = source && v = verb) 
                |> Seq.map createEntity        
            member this.Values' verb = 
                fun (s,v,t) -> if t = source && v = verb then Some(s) else None
                |> chooseLinks |> Seq.map createEntity                                            
            member this.ToString() = toString source            
        }                    
    /// Gets or creates item
    member this.ObtainItem (value:string) = 
        createEntity(obtainItem value)               
/// Encapsulates associative data entity                                 
and IEntity =
    /// Adds link with specified verb and target
    abstract Add : string * string -> IEntity
    /// Returns all links from this entity matching the specified verb
    abstract Links : string -> IEntity seq
    /// Returns first value matching the specified verb
    abstract Value : string -> IEntity    
    /// Returns all values to this entity matching the specified verb
    abstract Values' : string -> IEntity seq    
    /// Returns a string that represents this instance
    abstract ToString : unit -> string

 

Add some operator overloads to help prettify the code:

// Dynamic lookup operator oveload
let (?) (source:IEntity) (verb:string) = source.Value(verb)
//  Addition operator overload
let (+) (source:IEntity) (verb:string,target:string) = source.Add(verb, target)

 

Now we can build the flight example from Wikipedia:

let r = Repository()
let flight = r.ObtainItem("Flight BA111")
let trip = 
    flight +
    ("arrives at", "London Heathrow") + 
    ("on","Dec 12") + 
    ("at","10:25")
do  System.Diagnostics.Debug.WriteLine trip

 

Or a fragment of a web store:

open System.Diagnostics   
   
do  let category1 = "F# Books"
    let product1 = "Functional Programming with examples in F# and C#"
    let item1 = r.ObtainItem(product1)
    item1 + ("author","Tomas Petricek") |> ignore
    item1 + ("sold by","Amazon") + ("price","27.99") |> ignore    
    item1 + ("sold by","Paperback World") + ("price", "25.99") |> ignore
    item1 + ("category", category1) |> ignore
    let product2 = "Expert F#"
    let item2 = r.ObtainItem(product2)
    item2 + ("author", "Don Syme") |> ignore
    item2 + ("sold by","Amazon") + ("price","27.99") |> ignore
    item2 + ("sold by","Hardback World") + ("price","27.99") |> ignore
    item2 + ("category", category1) |> ignore
    let user1 = r.ObtainItem("Phil")
    user1 + ("viewed", product1) |> ignore 
    user1 + ("viewed", product2) |> ignore
        
    let ShowItemInfo (item:IEntity) =
        item.Links("sold by") |> Seq.iter (fun seller ->
            Debug.WriteLine seller
            Debug.WriteLine seller?price
        ) 
    ShowItemInfo item1
    ShowItemInfo item2     
    
    let amazon = r.ObtainItem("Amazon")
    amazon.Values'("sold by") |> Seq.iter Debug.WriteLine  

 

To serialize the data to XML simply add the following members to the repository:

/// Writes data to specified XmlWriter instance
member this.WriteTo (writer:XmlWriter) =
    let rec traverse = function
        | Item value as item -> 
            writer.WriteStartElement("Item")
            writer.WriteAttributeString("Value", value)                                
            filterLinks (fun (s,_,_) -> s = item) 
            |> Seq.iter traverse 
            writer.WriteEndElement()
        | Link(source,verb,target) as link ->
            writer.WriteStartElement("Link")
            writer.WriteAttributeString("Verb",verb)
            writer.WriteAttributeString("Target",toString target)                
            filterLinks (fun (s,_,_) -> s = link) 
            |> Seq.iter traverse
            writer.WriteEndElement()                                      
    writer.WriteStartElement("Repository")        
    items |> Seq.iter traverse
    writer.WriteEndElement()        
/// Reads data from specified XmlReader instance
member this.ReadFrom (reader:XmlReader) =         
    reader.ReadStartElement("Repository")
    let mutable xs = []
    while reader.Read() do
        match reader.NodeType, reader.Name with
        | XmlNodeType.Element, "Item" ->
            let value = reader.GetAttribute("Value")
            let item = obtainItem(value)
            xs <- item :: xs
        | XmlNodeType.Element, "Link" ->
            let source = xs.Head
            let verb = reader.GetAttribute("Verb")
            let target = reader.GetAttribute("Target")   
            let link = createLink(source,verb,obtainItem target)
            xs <- link :: xs
        | XmlNodeType.EndElement, "Item" 
        | XmlNodeType.EndElement, "Link" ->
            xs <- xs.Tail
        | _ -> ()
    done  

 

The implementation presented is purely for interest; there are many improvements and optimizations that could be made for a production system.

Finally, a Java implementation of the Associative model exists called Sentences, and is free.