ThreadSafeDictionary in Hopac in F#

In this post I’ll give you an introduction to Hopac in F#; you’ll learn how to encapsulate an abstract data type, like a dictionary, in an API that uses Hopac conventions.

First, why even both with Hopac when there’s Async<_> and Task<_> and the obscure Microsoft flow-library and …? Because it gives you:

  • Cheap parallelism
  • A programming model that has stood the test of time since Concurrent ML was invented
  • The concept of selective choice, which proves very powerful in client-server communication (in process).
  • A high-performance environment for parallel and concurrent programming in a strongly typed language, F#.

Speaking for myself, writing code in Hopac is about as fun as it gets. I’ve written a high-performance logging library, Logary, that kicks ass. I’ve written a Remote PowerShell that was used a behaviour tree to coordinate a distributed deployment between machines (orchestration) – in just a week – what has taken Puppet its full software lifetime to do (and which is still not complete!), and many more things.

We also use Hopac extensively at qvitoo.com – it’s been one of those sturdy, well-behaved components of our system for some years now.

Interested? Then, let’s continue.

We’ll start at only skimming the implementation, which is as follows.

namespace HafCollections

open Hopac
open Hopac.Infixes

// Also has a gist:
// https://gist.github.com/haf/472f16d04a14f917be87f75f73a85064

/// A thread safe dictionary supports multiple-readers', multiple-writers'
/// access to a normal .Net dictionary.
type ThreadSafeDictionary<'K, 'V> =
  private {
    tryAddCh: Ch<'K * (unit -> 'V) * Ch<'V> * Promise<unit>>
    tryAddSelectCh: Ch<'K * (unit -> 'V) * ('V -> obj) * Ch<obj> * Promise<unit>>
    tryRemoveCh: Ch<'K * Ch<'V option> * Promise<unit>>
  }

/// A thread safe dictionary supports multiple-readers', multiple-writers'
/// access to a normal .Net dictionary.
module ThreadSafeDictionary =
  open System.Collections.Generic
  /// Creates a new thread safe dictionary.
  let create (): Job<ThreadSafeDictionary<'K, 'V>> =
    let tryFindAdd, tryFindAddSel, tryRemove = Ch (), Ch (), Ch ()
    let dic = Dictionary<'K, 'V>()
    let tryFind fac key =
      match dic.TryGetValue key with
      | false, _ -> fac ()
      | _, value -> value
    let selectOp =
      Alt.choose [
        tryFindAdd ^=> fun (key, fac, repl, nack) ->
          let value = tryFind fac key
          dic.[key] <- value
          Alt.choose [
            repl *<- value
            nack ^-> fun () -> ignore (dic.Remove key)
          ]

        tryFindAddSel ^=> fun (key, fac, selector, repl, nack) ->
          let value = tryFind fac key
          dic.[key] <- value
          Alt.choose [
            repl *<- selector value
            nack ^-> fun () -> ignore (dic.Remove key)
          ]

        tryRemove ^=> fun (key, repl, nack) ->
          match dic.TryGetValue key with
          | false, _ ->
            repl *<- None <|> nack
          | _, value ->
            ignore (dic.Remove key)
            Alt.choose [
              repl *<- Some value
              nack ^-> fun () -> dic.Add(key, value)
            ]
      ]

    Job.foreverServer selectOp >>-.
    { tryAddCh = tryFindAdd
      tryAddSelectCh = tryFindAddSel
      tryRemoveCh = tryRemove }

  let tryFindAdd key fac (x: _): Alt<'V> =
    x.tryAddCh *<+->- fun repl nack -> key, fac, repl, nack

  let tryFindAddSelect key factory (selector: 'V -> 'x) (x: _): Alt<'x> =
    let op =
      x.tryAddSelectCh *<+->- fun repl nack ->
      key, factory, selector >> box, repl, nack
    op ^-> unbox

  let tryRemove key (x: _): Alt<'V option> =
    x.tryRemoveCh *<+->- fun repl nack -> key, repl, nack

Getting started understanding Hopac can be done with the following resources. You may choose to read these before continuing, but you can equally read this blog entry to the end, read the resources, play around a bit and then re-read this blog entry to strengthen your knowledge of concurrent programming.

  1. Read the Hopac programming guide,
  2. or read the client-server cheat sheet.
  3. At this point you should be able to decipher how this thread safe dictionary is constructed.
  4. Preferrably read half of the Reppy book that describes the background – Concurrent ML book.

The basic structure of this type and its corresponding module is as follows. (It’s useful to bring up another browser window with the implementation by the side as you read along.)

  • The main constructor is the create function. It returns a job that has starts the server loop. In this case, the server loop never exits, but instead is garbage collected when there are no live object references to it.
  • The type ThreadSafeDictionary<'K,'V> has a private constructor – an example of information hiding at its best. The idea is that you only use the module’s functions to interact with the dictionary. This is useful, because then it’s up to the caller (and not the constructor) to choose when and where to start the concurrent Job operation.
  • Meanwhile, the module’s functions are typed to the “unwrapped” value of ThreadSafeDictionary<'K,'V> (not a Job<_>). Note that, because the constructor is private, the consumer never needs to be concerned with the advanced generic types and can instead focus on the public API of the module’s functions.

Let’s walk through each client-side operation. With client-side operations I refer to the module’s function, that all return alternatives (Alt<_>).

  • tryFindAdd: This function takes a key, a factory and as its last parameter takes the object instance (that encapsulates the communication channels). By taking the object instance at the end, you enable the user to pipe the object through consecutive bind operators,

    val (>>=): Job<'a> -> ('a -> Job<'b>) -> Job<'b>
    

    because, the second parameter is the callback, and you can curry the module’s function to match the signature, like so

    let sample = ThreadSafeDictionary.tryFindAdd "haf" factory
    // val it: ThreadSafeDictionary<'K, 'V> -> Alt<'V>
    

    hence:

    let sample2 = ThreadSafeDictionary.create() >>= sample
    // val it: Job<string>
    
  • tryFindAddSelect: This function uses a nifty trick, that you can let the caller (the client function tryFindAddSelect) box and unbox to avoid adding too many generics to the encapsulating type signature. It wouldn’t be as nice to have a ThreadSafeDictionary<'K, 'V, 'something> just because the API in the end needs to support a bit of extra typing.

  • tryRemove: This function is given a key and gives you back a selective choice encapsulated in an Alt<_>, where the committal to the choice gives you back whether the key was successfully found (Some value) or otherwise if it was not present in the dictionary, a None. A normal anti-pattern in “regular C#/F#” otherwise, is to first to try to find the value, then remove it; however, since we want to have a single atomic action (the whole reason to encapsulate the dictionary in a Hopac server), we need to pass ALL the operational knowledge as parameters/return values. In normal speech amongst functional programmers, this is called a type-based approach (as opposed to an imperative variant).

Finally, let’s try to understand the implementation of the ThreadSafeDictionary.

We start by creating the state that the loop will act upon.

let tryFindAdd, tryFindAddSel, tryRemove = Ch (), Ch (), Ch ()
let dic = Dictionary<'K, 'V>()

There are three channels, created via their constructors Hopac.Ch: unit -> Ch<_>. There’s also a Dictionary<,> that serves as the backing store/abstract data type of this wrapped implementation. A mutable dictionary like this, from .Net is rather performant, so you’ll probably find that the overhead lies elsewhere, like in the contention amongst readers-writers.

By wrapping the selective choice over the channels in an Alt.choose, we can guarantee that the alternative (a subtype of Job<_>) is fully executed before another channel’s value is selected over and the corresponding callback executed (the code after ^=>). Because of this, we don’t have to be afraid of concurrent accesses to the underlying dictionary, and we can extract tryFind to its own function that closure-captures the dic value.

let tryFind fac key =
  match dic.TryGetValue key with
  | false, _ -> fac ()
  | _, value -> value

We can then assign selectOp the selective choice Alt<unit> value. Again, since Alt<unit> is a subtype of Job<unit>, we can later pass the alternative to

Job.foreverServer selectOp

Job.foreverServer has the signature Job<unit> -> Job<unit> and the documentation:

Creates a job that starts a separate server job that repeats the given job indefinitely. `foreverServer xJ` is equivalent to `forever xJ |> server`.

Meanwhile, server: Job<Void> -> Job<unit> with documentation:

Creates a job that immediately starts running the given job as a separate concurrent job like `start`, but the given job is known never to return normally, so the job can be spawned in an even more lightweight manner.

What this means is that after the call to foreverServer returns, the selectOp alternative has been downcast to job (by the compiler), like so

selectOp :> Job<_>

Meanwhile, forever has created a tail-recursive job/a while loop that iteratively, forever, calls the given Job<_>. This Job<_> returned from forever is cold in the sense that nothing has scheduled it to run. This is where server comes into play and schedules the job to run on the Hopac Scheduler. The actual scheduling operation is itself a cooperative-multithreading operation; thus the returned value is a Job<unit>, which is an actual cold job to schedule the previous job on the scheduler.

Puh!

This made my head spin my first time around as well. The upside of this, is that you can parametise any constructor/function as you wish (in this case we don’t; using only unit/()) – get a Job<Something> back, and then get a new instance of Something every time you (schedule to) run the returned Job. This is in stark contrast with the Task<_> abstraction in System.Threading.Tasks, which is always hot (executing now).

By making the Jobs/Alts cold, we gain the ability to compose them, without wrapping them in functions/closures, and then re-run them, avoiding GC allocations (for the Jobs/Alts). This is part of why Hopac is friendly for the GC and has such high performance.

With the underlying thinking out of the way, let’s continue with the rabbit hole with the actual implementation. Alt.choose “creates an alternative that is available when any one of the given alternatives is”. In effect, it takes a value from the first input channel that yields a value. Once a value has been taken, that input channel’s value (also an alternative/Alt<_>) has been committed to, and there’s no backing out of ‘getting’ that channel’s input value.

The value is then passed to the callback, via the operator ^=> which is similar to >>= (bind) but for alternatives. I say similar, because the alternative has already been committed to when the callback is called, so any potential side-effects that that Alt<_> may have had on commit are now non-reversible. (In this case, removing the input from the channel.) The textual representation of ^=> is afterJob: ('x -> #Job<'y>) -> Alt<'x> -> Alt<'y>, that is, it takes the callback to call with the x-value inside the alternative, when that alternative becomes available.

Or in other words, “do this function that returns a job of some sort, after the alt”.

So with this description of how alternatives work; here’s the corresponding implementation.

Alt.choose [
  tryFindAddSel ^=> fun (key, fac, selector, repl, nack) ->
    let value = tryFind fac key
    // ...

The function is tupled, because that’s the only way to ferry the values into a function callback. After -> is the body of the callback, where we try to find the value by key or we otherwise run the factory function. Then the side-effect is performed.

dic.[key] <- value

Here comes the NACK-/negative ACKnowledgement portion of the Hopac programming model.

Alt.choose [
  repl *<- selector value
  nack ^-> fun () -> ignore (dic.Remove key)
]

Because ^=> needs to return “something Job<_>”, the return value of Alt.choose is automatically upcast by the compiler, and we commit to the Alt returned (which always will return, because either the repl (reply) channel becomes selected by the client that communicates with the server (ACK) or otherwise; the client NACKs the Alt<_>, causing the nack: Promise<unit> to get a unit value (get filled). ^->/afterFun will then revert the side-effect by removing the value from the dictionary again.

The other channels do very similar things, so you’ll be able to take it from here!

The single funky thing left to explain is the >>-. operator. So, remember that >>= was called bind and lets you run a function on the result of something monadic? The double arrow (like ^=> above) means that that function should return something monadic in turn. Like ((:Job<'a>) >>= (fn: 'a -> Job<'b>)): Job<'b> or ((:Alt<'a>) ^=> (fn: 'a -> #Job<'b>)): Alt<'b>, the signature of >>- “is” ((:Job<'a>) >>- (fn: 'a -> 'b)): Job<'b>. In other words, it’s a map functor/function!

The same pattern; - means map and = means bind, is used throughout Hopac.

Furthermore, in functional programming, if you have two things you want to execute, like Job<'a> and Job<'b>, and you want to (monadically) execute Job<'a> before Job<'b>, but you don’t care about the value 'a inside the first job, you throw it away with a dot appended to the operator.

let a: Job<'a> = ...
let b: Job<'b> = ...
let res: Job<'b> = a >>=. b // Jobs being cold lets us compose them

Similarly, if b is just some value, that you want to make the value of the returned job:

let a: Job<'a> = ...
let b: int = 35
let res: Job<int> = a >>-. b

Easy to remember, now that you know what the different dots and arrows mean, right?

So what does this look like in action? Glad you asked!

#r "Hopac.Core.dll"
#r "Hopac.dll"
#load "ThreadSafeDictionary.fs"
open Hopac

let tsd: ThreadSafeDictionary<string, string> = ThreadSafeDictionary.create() |> run
(*
val tsd : ThreadSafeDictionary<string,string> =
  {tryAddCh =
  Hopac.Ch`1[System.Tuple`4[System.String,Microsoft.FSharp.Core.FSharpFunc`2[Microsoft.FSharp.Core.Unit,System.String],Hopac.Ch`1[System.String
],Hopac.Promise`1[Microsoft.FSharp.Core.Unit]]];
 tryAddSelectCh =
  Hopac.Ch`1[System.Tuple`5[System.String,Microsoft.FSharp.Core.FSharpFunc`2[Microsoft.FSharp.Core.Unit,System.String],Microsoft.FSharp.Core.FS
harpFunc`2[System.String,System.Object],Hopac.Ch`1[System.Object],Hopac.Promise`1[Microsoft.FSharp.Core.Unit]]];
 tryRemoveCh =
  Hopac.Ch`1[System.Tuple`3[System.String,Hopac.Ch`1[Microsoft.FSharp.Core.FSharpOption`1[System.String]],Hopac.Promise`1[Microsoft.FSharp.Core
.Unit]]];}
*)

let i = ref 0
// val i : int ref = {contents = 0;}

let factory = fun () -> i := !i + 1; sprintf "exists%i" (!i)
// val factory : unit -> string

tsd |> ThreadSafeDictionary.tryFindAdd "haf" factory |> run
//val it : string = "exists1"
tsd |> ThreadSafeDictionary.tryFindAdd "haf" factory |> run
//val it : string = "exists1"

tsd |> ThreadSafeDictionary.tryFindAddSelect "haf" factory (fun str -> "It " + str) |> run
// val it : string = "It exists1"
tsd |> ThreadSafeDictionary.tryFindAddSelect "haf" factory (fun str -> "It " + str) |> run
// val it : string = "It exists1"

tsd |> ThreadSafeDictionary.tryRemove "haf" |> run
// val it : string option = Some "exists1"
tsd |> ThreadSafeDictionary.tryRemove "haf" |> run
// val it : string option = None

tsd |> ThreadSafeDictionary.tryFindAdd "haf" factory |> run
// val it : string = "exists2"

tsd |> ThreadSafeDictionary.tryRemove "haf" |> run
// val it : string option = Some "exists2"

/Henrik

Twitter @henrikfeldt

Twitter @logarylib

Henrik Feldt +46 737 53 27 18 haf