Running computational intensive code outside of Hopac scheduler

Hopac uses a bounded pool of worker threads, number of which is equal to number of CPU cores (by default). A dangerous thing about this design is that a situation is possible where all the threads are busy doing some CPU intensive work and no other Hopac jobs can proceed. A good solution for this is running such a CPU bound computations on the standard .NET thread pool, freeing Hopac pool for more intelligent work. I found a nice code in one of the older Hopac GitHub discussions which schedules a ordinary function on ThreadPool and represents the result as a Hopac job.

Here is a test with explanations:

[<AutoOpen>]
module Utils =
let inline (^) x = (<|) x
let compute() =
let rec fib n = if n < 2 then n else fib (n - 1) + fib (n - 2)
fib 44 |> ignore
let inline print fmt =
kprintf (fun x ->
let tname = match Thread.CurrentThread.Name with null -> "null" | x -> x
Console.WriteLine (sprintf "%O [Thread %s] %s" DateTime.Now tname x)) fmt
type ThreadPool with
static member runAsJob (op: unit -> 'x) : Job<'x> =
Job.Scheduler.get() >>= fun sr ->
let rV = IVar()
fun _ -> Scheduler.start sr (try rV *<= op () with e -> rV *<=! e)
|> ThreadPool.QueueUserWorkItem
|> ignore
rV
// forever executing server that prints incoming messages to console
let server() =
let ch = Ch<int>()
Job.delay ^ fun _ -> ch >>- print "Got %d request"
|> Job.foreverServer
|> queue
ch
// job that scheduled `compute()` function to be run on the standard .NET `ThreadPool`
let threadPoolJob = ThreadPool.runAsJob ^ fun _ ->
print "start"
compute()
printfn "done."
// "normal" Hopac job, e.g. it performs `compute()` on Hopac global scheduler (thread pool)
let hopacJob =
job {
print "start"
compute()
print "done."
}
// spawn the printing server and get a channel you can send messages to
let req = server()
// spawn another server, which sends an increasing number to the above server each 1 second
Job.iterateServer 0 ^ fun i ->
req *<- i >>=. timeOutMillis 1000 >>-. i + 1
|> queue
// enqueue as many jobs as number of CPU cores...
// in standard `ThreadPool` and...
for _i in 1..Environment.ProcessorCount do queue threadPoolJob
// in Hopac pool
for _i in 1..8 do queue hopacJob
****************** Running in Hopac pool ******************
11.06.2016 22:04:23 [Thread Hopac.Worker.0/8] Got 0 request
11.06.2016 22:04:24 [Thread Hopac.Worker.2/8] Got 1 request
11.06.2016 22:04:25 [Thread Hopac.Worker.2/8] Got 2 request
11.06.2016 22:04:26 [Thread Hopac.Worker.2/8] Got 3 request
11.06.2016 22:04:27 [Thread Hopac.Worker.0/8] Got 4 request
11.06.2016 22:04:28 [Thread Hopac.Worker.2/8] Got 5 request
11.06.2016 22:04:28 [Thread Hopac.Worker.7/8] start
11.06.2016 22:04:28 [Thread Hopac.Worker.2/8] start
11.06.2016 22:04:28 [Thread Hopac.Worker.5/8] start
11.06.2016 22:04:28 [Thread Hopac.Worker.4/8] start
11.06.2016 22:04:28 [Thread Hopac.Worker.3/8] start
11.06.2016 22:04:28 [Thread Hopac.Worker.1/8] start
11.06.2016 22:04:28 [Thread Hopac.Worker.0/8] start
11.06.2016 22:04:28 [Thread Hopac.Worker.6/8] start
// here is a hole in 6 seconds when the printing server was not receiving messages
// because all Hopac worker threads was busy doing the calculation.
11.06.2016 22:04:34 [Thread Hopac.Worker.1/8] done.
11.06.2016 22:04:34 [Thread Hopac.Worker.1/8] Got 6 request
11.06.2016 22:04:34 [Thread Hopac.Worker.7/8] done.
11.06.2016 22:04:34 [Thread Hopac.Worker.0/8] done.
11.06.2016 22:04:34 [Thread Hopac.Worker.3/8] done.
11.06.2016 22:04:35 [Thread Hopac.Worker.4/8] done.
11.06.2016 22:04:35 [Thread Hopac.Worker.5/8] done.
11.06.2016 22:04:35 [Thread Hopac.Worker.2/8] done.
11.06.2016 22:04:35 [Thread Hopac.Worker.6/8] done.
11.06.2016 22:04:35 [Thread Hopac.Worker.7/8] Got 7 request
11.06.2016 22:04:36 [Thread Hopac.Worker.6/8] Got 8 request
11.06.2016 22:04:37 [Thread Hopac.Worker.6/8] Got 9 request
11.06.2016 22:04:38 [Thread Hopac.Worker.6/8] Got 10 request
11.06.2016 22:04:39 [Thread Hopac.Worker.6/8] Got 11 request
****************** Running in .NET `ThreadPool` ******************
11.06.2016 22:08:47 [Thread Hopac.Worker.2/8] Got 1 request
11.06.2016 22:08:48 [Thread Hopac.Worker.2/8] Got 2 request
11.06.2016 22:08:49 [Thread Hopac.Worker.2/8] Got 3 request
11.06.2016 22:08:50 [Thread Hopac.Worker.4/8] Got 4 request
11.06.2016 22:08:50 [Thread null] start
11.06.2016 22:08:50 [Thread null] start
11.06.2016 22:08:50 [Thread null] start
11.06.2016 22:08:50 [Thread null] start
11.06.2016 22:08:50 [Thread null] start
11.06.2016 22:08:50 [Thread null] start
11.06.2016 22:08:50 [Thread null] start
11.06.2016 22:08:50 [Thread null] start
// although all the computation have started, the printing server keeps receiving messages.
11.06.2016 22:08:51 [Thread Hopac.Worker.6/8] Got 5 request
11.06.2016 22:08:52 [Thread Hopac.Worker.6/8] Got 6 request
11.06.2016 22:08:53 [Thread Hopac.Worker.5/8] Got 7 request
11.06.2016 22:08:54 [Thread Hopac.Worker.2/8] Got 8 request
11.06.2016 22:08:55 [Thread Hopac.Worker.2/8] Got 9 request
11.06.2016 22:08:56 [Thread Hopac.Worker.5/8] Got 10 request
11.06.2016 22:08:57 [Thread Hopac.Worker.2/8] Got 11 request
11.06.2016 22:08:57 [Thread null] done.
11.06.2016 22:08:57 [Thread null] done.
11.06.2016 22:08:57 [Thread null] done.
11.06.2016 22:08:58 [Thread null] done.
11.06.2016 22:08:58 [Thread null] done.
11.06.2016 22:08:58 [Thread null] done.
11.06.2016 22:08:58 [Thread null] done.
11.06.2016 22:08:58 [Thread null] done.
11.06.2016 22:08:58 [Thread Hopac.Worker.6/8] Got 12 request
11.06.2016 22:08:59 [Thread Hopac.Worker.2/8] Got 13 request
11.06.2016 22:09:00 [Thread Hopac.Worker.7/8] Got 14 request
11.06.2016 22:09:01 [Thread Hopac.Worker.7/8] Got 15 request
11.06.2016 22:09:02 [Thread Hopac.Worker.1/8] Got 16 request

Comments

Popular posts from this blog

Regular expressions: Rust vs F# vs Scala

Hash maps: Rust, F#, D, Go, Scala

Haskell: performance