Parallel reduce: Hopac, Asyncs, Tasks and Scala's Futures
We translate the original code almost as is to Tasks and Hopac:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open Hopac | |
open Hopac.Job.Infixes | |
open System.Threading.Tasks | |
open System | |
let reduceParallelTasks<'a> f (ie: 'a array) = | |
let rec reduceRec f (ie: 'a array) = | |
match ie.Length with | |
| 1 -> ie.[0] | |
| 2 -> f ie.[0] ie.[1] | |
| len -> | |
let h = len / 2 | |
let o1 = Task.Run(fun _ -> reduceRec f ie.[0..h - 1]) | |
let o2 = Task.Run(fun _ -> reduceRec f ie.[h..]) | |
f o1.Result o2.Result | |
match ie.Length with | |
| 0 -> failwith "Sequence contains no elements" | |
| _ -> reduceRec f ie | |
let reduceParallelAsync<'a> f (ie: 'a array) = | |
let rec reduceRec f (ie: 'a array) = | |
async { | |
match ie.Length with | |
| 1 -> return ie.[0] | |
| 2 -> return f ie.[0] ie.[1] | |
| len -> | |
let h = len / 2 | |
let! o1a = Async.StartChild <| reduceRec f ie.[0..h - 1] | |
let! o2 = reduceRec f ie.[h..] | |
let! o1 = o1a | |
return f o1 o2 | |
} | |
match ie.Length with | |
| 0 -> failwith "Sequence contains no elements" | |
| _ -> Async.RunSynchronously <| reduceRec f ie | |
let reduceParallelHopac<'a> f (a: 'a array) = | |
let rec reduceRec (f, ie: 'a array) = | |
match ie.Length with | |
| 1 -> Job.result ie.[0] | |
| 2 -> Job.result (f ie.[0] ie.[1]) | |
| len -> | |
let h = len / 2 | |
reduceRec (f, ie.[0..h - 1]) <*> Job.delayWith reduceRec (f, ie.[h..]) |>> fun (x, y) -> f x y | |
match a.Length with | |
| 0 -> failwith "Sequence contains no elements" | |
| _ -> run <| reduceRec (f, a) | |
let cleanup() = | |
for _ in 1..5 do | |
GC.Collect () | |
Threading.Thread.Sleep 50 | |
let a = [| 1L..5000000L |] | |
Array.reduce (+) a | |
cleanup() | |
reduceParallelTasks (+) a | |
cleanup() | |
reduceParallelHopac (+) a | |
cleanup() | |
reduceParallelAsync (+) a |
And Scala's Futures:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import concurrent.duration.Duration | |
import concurrent.{Await, Future} | |
import concurrent.ExecutionContext.Implicits.global | |
object ParallelReduce { | |
def reduce [A](f: (A, A) => A, a: Array[A]) : A = { | |
def reduceRec(a: Array[A]) : Future[A] = { | |
a.length match { | |
case 1 => Future(a(0)) | |
case 2 => Future { | |
f(a(0), a(1)) | |
} | |
case len => { | |
val h = len / 2 | |
val o1a = reduceRec(a.slice(0, h)) | |
for { | |
o2 <- reduceRec(a.slice(h, a.length)) | |
o1 <- o1a | |
} yield f(o1, o2) | |
} | |
} | |
} | |
a match { | |
case Array() => throw new Exception("Sequence contains no elements") | |
case _ => Await.result(reduceRec(a), Duration.Inf) | |
} | |
} | |
def run() = { | |
val a = Array.range(1, 5000000) | |
Util.time { reduce((x: Int, y: Int) => x + y, a) } | |
} | |
} |
The results (Core i5, 4 cores):
- Sequential List.reduce: Real: 00:00:00.014, CPU: 00:00:00.015, GC gen0: 0, gen1: 0, gen2: 0
- Tasks: Real: 00:00:01.790, CPU: 00:00:05.678, GC gen0: 36, gen1: 10, gen2: 1
- Hopac: Real: 00:00:00.514, CPU: 00:00:01.482, GC gen0: 27, gen1: 2, gen2: 1
- Asyncs: Real: 00:00:37.872, CPU: 00:01:48.405, GC gen0: 90, gen1: 29, gen2: 4
- Scala Futures: 4.8 seconds
Hopac is ~3.5 times faster than TPL. What's wrong with Asyncs? I don't know. Maybe they are not intended for highly concurrent scenarios. Or my code may not be the most efficient. Any ideas, guys?
Let's test the leaders on larger arrays:
(Hopac is 3.37 times faster, Scala is 1.5 times slower)
(Hopac is 5.25 times faster, Scala is 1.05 times slower)
Comments
For high-parallelism use e.g. Array.Parallel.
E.g. for array with just 5000 items:
Array.reduce (fun c -> fun x -> System.Threading.Thread.Sleep(30);x+c) a
Real: 00:02:36.232
reduceParallelTasks (fun c -> fun x -> System.Threading.Thread.Sleep(30);x+c) a
Real: 00:00:06.730
reduceParallelAsync (fun c -> fun x -> System.Threading.Thread.Sleep(30);x+c) a
Real: 00:00:05.361
reduceParallelHopac (fun c -> fun x -> System.Threading.Thread.Sleep(30);x+c) a
Real: 00:00:20.032