ryoakg
3/5/2016 - 8:11 AM

Clojure core.async pipeline example

Clojure core.async pipeline example

(ns foo
  (:require [clojure.core.async :as async]
            [clj-http.client :as http]))

(comment
  :dependencies
  [[org.clojure/core.async "0.2.374"]
   [cheshire "5.5.0"]
   [clj-http "2.1.0"]
   ])

(def concurrency 5)

(let [in (async/chan)
      out (async/chan)
      request-handler (fn [url out*]
                        (async/thread
                          (println "Making request:" url)
                          (->> (http/get url {:as :json})
                               :body :items
                               (map :clone_url)
                               (map #(async/>!! out %))
                               dorun)
                          (async/close! out*)))]

  ;; Process `in` messages concurrently
  (async/pipeline-async concurrency out request-handler in)

  ;; Push URLs to process
  (async/thread
    (->> (range 1 11)
         (map #(str "https://api.github.com/search/repositories?q=language:clojure&page=" %))
         (map #(async/>!! in %))
         dorun))

  ;; Print results of processing
  (async/thread
    (loop []
      (println (async/<!! out))
      (recur))))