Running computational intensive code outside of Hopac scheduler
Hopac uses a bounded pool of worker threads, number of which is equal to number of CPU cores (by default). A dangerous thing about this design is that a situation is possible where all the threads are busy doing some CPU intensive work and no other Hopac jobs can proceed. A good solution for this is running such a CPU bound computations on the standard .NET thread pool, freeing Hopac pool for more intelligent work. I found a nice code in one of the older Hopac GitHub discussions which schedules a ordinary function on ThreadPool and represents the result as a Hopac job.
Here is a test with explanations:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[<AutoOpen>] | |
module Utils = | |
let inline (^) x = (<|) x | |
let compute() = | |
let rec fib n = if n < 2 then n else fib (n - 1) + fib (n - 2) | |
fib 44 |> ignore | |
let inline print fmt = | |
kprintf (fun x -> | |
let tname = match Thread.CurrentThread.Name with null -> "null" | x -> x | |
Console.WriteLine (sprintf "%O [Thread %s] %s" DateTime.Now tname x)) fmt | |
type ThreadPool with | |
static member runAsJob (op: unit -> 'x) : Job<'x> = | |
Job.Scheduler.get() >>= fun sr -> | |
let rV = IVar() | |
fun _ -> Scheduler.start sr (try rV *<= op () with e -> rV *<=! e) | |
|> ThreadPool.QueueUserWorkItem | |
|> ignore | |
rV | |
// forever executing server that prints incoming messages to console | |
let server() = | |
let ch = Ch<int>() | |
Job.delay ^ fun _ -> ch >>- print "Got %d request" | |
|> Job.foreverServer | |
|> queue | |
ch | |
// job that scheduled `compute()` function to be run on the standard .NET `ThreadPool` | |
let threadPoolJob = ThreadPool.runAsJob ^ fun _ -> | |
print "start" | |
compute() | |
printfn "done." | |
// "normal" Hopac job, e.g. it performs `compute()` on Hopac global scheduler (thread pool) | |
let hopacJob = | |
job { | |
print "start" | |
compute() | |
print "done." | |
} | |
// spawn the printing server and get a channel you can send messages to | |
let req = server() | |
// spawn another server, which sends an increasing number to the above server each 1 second | |
Job.iterateServer 0 ^ fun i -> | |
req *<- i >>=. timeOutMillis 1000 >>-. i + 1 | |
|> queue | |
// enqueue as many jobs as number of CPU cores... | |
// in standard `ThreadPool` and... | |
for _i in 1..Environment.ProcessorCount do queue threadPoolJob | |
// in Hopac pool | |
for _i in 1..8 do queue hopacJob | |
****************** Running in Hopac pool ****************** | |
11.06.2016 22:04:23 [Thread Hopac.Worker.0/8] Got 0 request | |
11.06.2016 22:04:24 [Thread Hopac.Worker.2/8] Got 1 request | |
11.06.2016 22:04:25 [Thread Hopac.Worker.2/8] Got 2 request | |
11.06.2016 22:04:26 [Thread Hopac.Worker.2/8] Got 3 request | |
11.06.2016 22:04:27 [Thread Hopac.Worker.0/8] Got 4 request | |
11.06.2016 22:04:28 [Thread Hopac.Worker.2/8] Got 5 request | |
11.06.2016 22:04:28 [Thread Hopac.Worker.7/8] start | |
11.06.2016 22:04:28 [Thread Hopac.Worker.2/8] start | |
11.06.2016 22:04:28 [Thread Hopac.Worker.5/8] start | |
11.06.2016 22:04:28 [Thread Hopac.Worker.4/8] start | |
11.06.2016 22:04:28 [Thread Hopac.Worker.3/8] start | |
11.06.2016 22:04:28 [Thread Hopac.Worker.1/8] start | |
11.06.2016 22:04:28 [Thread Hopac.Worker.0/8] start | |
11.06.2016 22:04:28 [Thread Hopac.Worker.6/8] start | |
// here is a hole in 6 seconds when the printing server was not receiving messages | |
// because all Hopac worker threads was busy doing the calculation. | |
11.06.2016 22:04:34 [Thread Hopac.Worker.1/8] done. | |
11.06.2016 22:04:34 [Thread Hopac.Worker.1/8] Got 6 request | |
11.06.2016 22:04:34 [Thread Hopac.Worker.7/8] done. | |
11.06.2016 22:04:34 [Thread Hopac.Worker.0/8] done. | |
11.06.2016 22:04:34 [Thread Hopac.Worker.3/8] done. | |
11.06.2016 22:04:35 [Thread Hopac.Worker.4/8] done. | |
11.06.2016 22:04:35 [Thread Hopac.Worker.5/8] done. | |
11.06.2016 22:04:35 [Thread Hopac.Worker.2/8] done. | |
11.06.2016 22:04:35 [Thread Hopac.Worker.6/8] done. | |
11.06.2016 22:04:35 [Thread Hopac.Worker.7/8] Got 7 request | |
11.06.2016 22:04:36 [Thread Hopac.Worker.6/8] Got 8 request | |
11.06.2016 22:04:37 [Thread Hopac.Worker.6/8] Got 9 request | |
11.06.2016 22:04:38 [Thread Hopac.Worker.6/8] Got 10 request | |
11.06.2016 22:04:39 [Thread Hopac.Worker.6/8] Got 11 request | |
****************** Running in .NET `ThreadPool` ****************** | |
11.06.2016 22:08:47 [Thread Hopac.Worker.2/8] Got 1 request | |
11.06.2016 22:08:48 [Thread Hopac.Worker.2/8] Got 2 request | |
11.06.2016 22:08:49 [Thread Hopac.Worker.2/8] Got 3 request | |
11.06.2016 22:08:50 [Thread Hopac.Worker.4/8] Got 4 request | |
11.06.2016 22:08:50 [Thread null] start | |
11.06.2016 22:08:50 [Thread null] start | |
11.06.2016 22:08:50 [Thread null] start | |
11.06.2016 22:08:50 [Thread null] start | |
11.06.2016 22:08:50 [Thread null] start | |
11.06.2016 22:08:50 [Thread null] start | |
11.06.2016 22:08:50 [Thread null] start | |
11.06.2016 22:08:50 [Thread null] start | |
// although all the computation have started, the printing server keeps receiving messages. | |
11.06.2016 22:08:51 [Thread Hopac.Worker.6/8] Got 5 request | |
11.06.2016 22:08:52 [Thread Hopac.Worker.6/8] Got 6 request | |
11.06.2016 22:08:53 [Thread Hopac.Worker.5/8] Got 7 request | |
11.06.2016 22:08:54 [Thread Hopac.Worker.2/8] Got 8 request | |
11.06.2016 22:08:55 [Thread Hopac.Worker.2/8] Got 9 request | |
11.06.2016 22:08:56 [Thread Hopac.Worker.5/8] Got 10 request | |
11.06.2016 22:08:57 [Thread Hopac.Worker.2/8] Got 11 request | |
11.06.2016 22:08:57 [Thread null] done. | |
11.06.2016 22:08:57 [Thread null] done. | |
11.06.2016 22:08:57 [Thread null] done. | |
11.06.2016 22:08:58 [Thread null] done. | |
11.06.2016 22:08:58 [Thread null] done. | |
11.06.2016 22:08:58 [Thread null] done. | |
11.06.2016 22:08:58 [Thread null] done. | |
11.06.2016 22:08:58 [Thread null] done. | |
11.06.2016 22:08:58 [Thread Hopac.Worker.6/8] Got 12 request | |
11.06.2016 22:08:59 [Thread Hopac.Worker.2/8] Got 13 request | |
11.06.2016 22:09:00 [Thread Hopac.Worker.7/8] Got 14 request | |
11.06.2016 22:09:01 [Thread Hopac.Worker.7/8] Got 15 request | |
11.06.2016 22:09:02 [Thread Hopac.Worker.1/8] Got 16 request |
Comments