Parallel doseq for Clojure
OK, I think what I want is to have an agent
for each loop, with the data sent to the agent using send
. The agents triggered using send
are run from a thread pool, so the number is limited in some way (it doesn't give the fine-grained control of exactly three threads, but it'll have to do for now).
[Dave Ray explains in comments: to control pool size I'd need to write my own]
(defmacro dopar [seq-expr & body] (assert (= 2 (count seq-expr)) "single pair of forms in sequence expression") (let [[k v] seq-expr] `(apply await (for [k# ~v] (let [a# (agent k#)] (send a# (fn [~k] ~@body)) a#)))))
which can be used like:
(deftest test-dump (dopar [n (range 7 11)] (time (do-dump-single "/tmp/single" "a" n 10000000))))
Yay! Works! I rock! (OK, Clojure rocks a little bit too). Related blog post.
pmap
will actually work fine in most circumstances - it uses a thread pool with a sensible number of threads for your machine. I wouldn't bother trying to create your own mechanisms to control the number of threads unless you have real benchmark evidence that the defaults are causing a problem.
Having said that, if you really want to limit to a maximum of three threads, an easy approach is to just use pmap on 3 subsets of the range:
(defn split-equally [num coll] "Split a collection into a vector of (as close as possible) equally sized parts" (loop [num num parts [] coll coll c (count coll)] (if (<= num 0) parts (let [t (quot (+ c num -1) num)] (recur (dec num) (conj parts (take t coll)) (drop t coll) (- c t)))))) (defmacro dopar [thread-count [sym coll] & body] `(doall (pmap (fn [vals#] (doseq [~sym vals#] ~@body)) (split-equally ~thread-count ~coll))))
Note the use of doall
, which is needed to force evaluation of the pmap
(which is lazy).
There's actually a library now for doing exactly this. From their github
:
The claypoole library provides threadpool-based parallel versions of Clojure functions such as pmap
, future
, and for
.
It provides both ordered/unordered versions for the same.