In this post we will look at using a reactive programming paradigm within F#. There may be some of you that have used the Reactive Extensions (Rx), I am in fact a massive fan boy of Rx, and really enjoy what it can bring to a project, in particular a UI project benefits immensely from a more reactive approach.
There may of course be those of you that have never come across Rx at all. So lets take a very small detour and talk about the general idea of the observer pattern.
Observer Pattern
Lets say I have order system that should print invoices and also send emails to a client when a new order is received. A naive implementation of this may be to just lump all this into a single class, but this is a poor separation of concerns, we could do better. So what we could do it have a order system object which receives orders and then calls into 2 other classes that produce a invoice and send an email to a client.
This is ok, but we would now have to take a strong dependency on the sub systems from the order system class, and then call them when a new order arrives. Wouldn’t it be better if the order system class could just accept a list of subscribers (say implementing some IAcceptOrder
interface), and then any time a new order arrives the order system class would simply loop through its list of subscribers and call their AcceptOrder method (from the hypothetical IAcceptOrder
interface).
This is essence is the observer pattern, of course this pattern could also allow unsubscribing too.
Rx has built upon the general idea the observer pattern and has provided a great many standard .NET classes that aid in the construction of observable streams of data. You can also use subscriptions over events, asynchronous operations, and also use the standard LINQ operators.
So that is what Rx gives you.
F# also comes with a Control.Observable /Control.Event modules which contains many types that can be used to create reactive code The types and functions in this module mirror some of the functionality found in the Rx classes (see http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable(v=vs.103).aspx).
This article is however about F#, so we will be keeping the discussion from here on out about the F# Observable modules.
Observable Module
The F# Observable module is the place to start for reactive programming in F#, and contains the following functions/types
We will be seeing some examples of these is just a minute, but before we do it is worth just going on another slight detour. So lets just get the slight detour over with before we start to look at the Observable module in depth.
<SlightDetour>
There were several things that made Rx so powerful :
- The ability to treat events as a source to create an
IObservable<T>
. This IObservable<T>
could then have any of the standard LINQ operators applied to it, along with another specific set of IObservable<T>
extension methods. This is a very powerful technique, which allows you to do things like only listen to an event where some condition is true, project the original event args into a completely new type, merge 2 or more events into a single stream (this technique is particularly powerful when building UIs, I use this one a lot) - The decoupling of the event into a much for general purpose interface (
IObservable<T>
), which means that users of the original event source didn’t need a strong reference (they could be in different Dlls) to the original event source class. This is a much better / cleaner design, just rely on a type (IObservable<T>
) which is a base class type - The ability to subscribe, which would return a
IDisposable
, which you could then just wrap in a using(..) or Dispose of when you were done with it, both of which would stop the subscriber from receiving any further notifications
The F# team (Don Syme I guess here), have exposed some (though not all) of the Rx goodness in F# so it is important to understand some of the reasons why IObservable<T>
was so useful.
</SlightDetour>
OK now that we have talked about why Observable is better than a plain old event, lets have a look at some examples using the F# Observable module. Here is a small windows form (yes you can do forms in F# quite easily too) example that demonstrates the following things:
- How to create an
IObservable<’a>
from a standard event using Observable.filter
, where the filter is just being used to give back a IObservable<’a>
, in essence no filtering at all - How to use Observable.add to add a handler to an IObservable<’a>
- How to create an actual filter from an
IObservable<’a>
using Observable.filter
, this filter ensures that only MouseMove
events that happen in the bottom 1/2 of the form notify listeners - How to create an
IDisposable
subscription using Observable.subscribe
- How to cancel a subscription by simply calling Dispose on the subscriber which is an
IDisposable
. In this example this is done using a Task.Delay(4000)
, which means the subscriber will only work for 4 seconds, and then will not receive any notifications after that
open System
open System.IO
open System
open System.Linq
open System.Collections.Generic
open ConsoleApplication1.CustomTypes
open System.IO
open System.Drawing
open System.Windows.Forms
open System.Threading.Tasks
[<EntryPoint>]
let main argv =
let form = new Form(Text = "F# Windows Form",
Visible = true,
TopMost = true)
let label1 = new Label()
label1.Text <- "Label1"
label1.Location <- new Point(10,10)
let txt1 = new TextBox()
txt1.Width <- 160
txt1.Location <- new Point(120,10)
let label2 = new Label()
label2.Text <- "Label2"
label2.Location <- new Point(10,40)
let txt2 = new TextBox()
txt2.Width <- 160
txt2.Location <- new Point(120,40)
form.Controls.Add(label1)
form.Controls.Add(txt1)
form.Controls.Add(label2)
form.Controls.Add(txt2)
form.MouseMove
|> Observable.filter ( fun evArgs -> true)
|> Observable.add ( fun evArgs ->
txt1.Text <- String.Format("x: {0} y :{1}", evArgs.X, evArgs.Y))
let sub =
form.MouseMove
|> Observable.filter ( fun evArgs -> evArgs.Y > form.Height / 2)
|> Observable.subscribe ( fun evArgs ->
txt2.Text <- String.Format("x: {0} y :{1}", evArgs.X, evArgs.Y))
Task.Delay(4000).ContinueWith(fun x -> sub.Dispose()) |> ignore
Application.Run(form)
0
Which when runs looks like this:
With this one I urge you to try the code out for yourself, and you will not be able to see that the Label2 associated TextBox stops updating after 4 seconds in a screen shot
This is cool for sure, but the real power of Observable is that you can use it against your own events too, so lets wrap up this post by looking at an example where we create our own event source, and look at a few more of the Observable module functions.
Here is a small class that contains a single “NewOrderEvent
”, which uses a custom EventArgs
derived class called “OrderEventArgs
”. The “OrderEventArgs
” has the following 3 properties:
- Price : decimal
AuthorisationLevel
: Which uses a empty discriminating union type called “DiscountApprovalLevel
”.
Here is the relevant code:
namespace ConsoleApplication1
module CustomTypes =
open System
open System.Collections.Generic
open System.ComponentModel
open System.Reflection
type DiscountApprovalLevel = Standard | Manager | Ceo
type Order = { Price : decimal; AuthorisationLevel : DiscountApprovalLevel }
type OrderArgs(price : decimal, authorisationLevel : DiscountApprovalLevel) =
inherit System.EventArgs()
member this.Price = price
member this.AuthorisationLevel = authorisationLevel
type OrderChangeDelegate = delegate of obj * OrderArgs -> unit
type OrderSystem() =
let newOrderEvent = new Event<OrderChangeDelegate, OrderArgs>()
member this.CreateOrder(order) =
newOrderEvent.Trigger(this,new OrderArgs(order.Price, order.AuthorisationLevel))
[<CLIEvent>]
member this.NewOrderEvent = newOrderEvent.Publish
So that is the code for the source of the custom event, so how about the Observable code, lets see that next:
open System
open System.IO
open System
open System.Linq
open System.Collections.Generic
open ConsoleApplication1.CustomTypes
open System.IO
[<EntryPoint>]
let main argv =
let orderSystem = new OrderSystem()
let stdDiscountObservable, managerApprovalObservable =
orderSystem.NewOrderEvent
|> Observable.filter(fun orderArgs ->
match orderArgs.AuthorisationLevel with
| Standard | Manager -> true
| _ -> false)
|> Observable.partition(fun orderArgs ->
orderArgs.AuthorisationLevel = DiscountApprovalLevel.Standard)
stdDiscountObservable.Add(fun args -> printfn "Price : %A, Level : %A" args.Price args.AuthorisationLevel)
managerApprovalObservable.Add(fun args ->printfn "Price : %A, Level : %A" args.Price args.AuthorisationLevel)
orderSystem.CreateOrder( { Price = 120.0m; AuthorisationLevel = Manager } )
orderSystem.CreateOrder( { Price = 240.0m; AuthorisationLevel = Ceo } )
orderSystem.CreateOrder( { Price = 10.0m; AuthorisationLevel = Standard } )
orderSystem.CreateOrder( { Price = 20.0m; AuthorisationLevel = Standard } )
orderSystem.CreateOrder( { Price = 50.0m; AuthorisationLevel = Standard } )
orderSystem.CreateOrder( { Price = 240.0m; AuthorisationLevel = Ceo } )
Console.ReadLine() |> ignore
0
There are several things to point out in this code:
- We use a proper
Observable.filter
this time, such that only “Standard” and “Manager” AuthorisationLevel
OrderEventArgs come through. This means any event that has OrderEventArgs with a AuthorisationLevel
of “Ceo” are effectively ignored - We use Observable.partition to partition the source
IObservable<OrderEventArgs>
into 2 separate IObservable<OrderEventArgs>
Here is what this code looks like when it is run:
Using More Of The Rx Extension Methods
As I previously stated, I am a massive Rx fan boy, so I was a little disappointed to see that the F# Observable module did not have the full set of extension methods that Rx would have for IObservable<T>
. However it seems I am not alone here, and some people have put a Github project which brings the standard Rx extension methods to F#, here is a link : https://github.com/fsprojects/FSharp.Reactive