ryoakg
6/12/2016 - 1:54 PM

wait-for-async-thread-termination.clj

(set-env! :dependencies '[[org.clojure/core.async "0.2.374"]])
(require '[clojure.core.async :as async :refer [>! <! >!! <!!]]
         '[clojure.string :as str])

;;; thread/go の戻り値の channel を使う事で thread/go の終了を待つ事が出来る
(with-open [w (java.io.PipedWriter.)
            r (java.io.PipedReader. w)]
  (let [ch (async/chan 2)] ;バッファが多い場合、put したけど、thread が処理してない場合がある
    (async/thread
      (binding [*out* w]
        (loop []
          (let [c (<!! ch)]
            (when-not (nil? c)
              (print c)
              (Thread/sleep 500)        ;適当に重い処理を想定
              (recur))))))
    (dorun
     (map #(>!! ch %) "abcde"))
    (async/close! ch)
    (let [arr (char-array 20)]
      (.read r arr)
      (->> arr
           (take-while #(not= % (char 0)))
           str/join)))) ;; => "abc"
;; "de" は async/thread が 処理しきれていないので
;; ch に入ったままで (print c) できていない

(with-open [w (java.io.PipedWriter.)
            r (java.io.PipedReader. w)]
  (let [ch (async/chan 2)
        wait-ch (async/thread
                  (binding [*out* w]
                    (loop []
                      (let [c (<!! ch)]
                        (when-not (nil? c)
                          (print c)
                          (Thread/sleep 500)
                          (recur))))))]
    (dorun
     (map #(>!! ch %) "abcde"))
    (async/close! ch)
    (<!! wait-ch)                  ;ここで thread の終了を待つ様に変更
    (let [arr (char-array 20)]
      (.read r arr)
      (->> arr
           (take-while #(not= % (char 0)))
           str/join)))) ;; => "abcde"