foxlog
3/30/2018 - 7:09 AM

pmap 用法

pmap 用法

;;
;; vessel.util.clj
;;
(ns vessel.util
  (:require [clojure.java.io :as io]
            [vessel.log      :as log]
            [clojure.string  :as string]
            [me.raynes.fs    :as fs])
  (:import java.util.zip.GZIPOutputStream))

(defn mkdir
  [dir-path]
  (.mkdirs (io/file dir-path)))

(defn dir-path
  [file-path]
  (string/join "/" (butlast (string/split file-path #"/"))))

(defn file-list
  [start-dir]
  (remove #(empty? (% 2)) (vec (fs/iterate-dir start-dir))))

(defn gzip-file
  [input output & opts]
  (do
    (log/info (str "compress" " " input))
    (with-open [output (-> output io/output-stream GZIPOutputStream.)]
      (apply io/copy (io/reader input) output opts))))

(defn gzip-and-delete
  [input output]
  (do
    (gzip-file input output))
    (fs/delete input))

(defn gzip-and-delete-files
  [inputs & opts]
  (let [outputs (map #(str % ".gz") inputs)]
    ;;pmap 
    (doall (pmap gzip-and-delete inputs outputs))))

(defn show-url
  [bucket file-key]
  (str "s3://" bucket "/" file-key))
;;
;;bigml.test.api
;; Your BigML username and api-key go here!
  (api/with-connection (api/make-connection "johndoe" "123123123")
    (let [_ (println "Building the source and dataset for iris")
          source (api/get-final (source/create "test/data/iris.csv.gz"))
          dataset (api/get-final (dataset/create source))

          _ (println "Training a pruned tree using 2/3 of the data")
          model (api/get-final (model/create dataset
                                             :sample_rate 0.66
                                             :seed "iris-test"
                                             :stat_pruning true))

          _ (println "Evaluating the model on the remaining 1/3 of the data")
          evaluation (api/get-final (evaluation/create model dataset
                                                       :sample_rate 0.66
                                                       :seed "iris-test"
                                                       :out_of_bag true))]
      (println "Test set accuracy:"
               (:accuracy (:model (:result evaluation))))

      (println "Cleaning up by deleting the resources from BigML")
      ;;pmap 并发删除多项内容
      (doall (pmap api/delete [source dataset model evaluation])))))
;;
;;httpkit.client_test.clj
;;pmap 访问url
;;
(defn- fetch-group-urls [group-idx urls]
  (let [s (System/currentTimeMillis)
        ;; 并发访问获得url内容, 然后通过doseq等待
        requests (doall (pmap get-url urls))]
    (doseq [r requests] @r) ; wait
    (info group-idx "takes time" (- (System/currentTimeMillis) s))))
;;
;; 来自Dennis http://blog.fnil.net/blog/964a2d5c851b7e5fd65b630a7ef96966/
;; 
;; 更多使用场景: 在用户首页登陆后的各种统计信息, 尽量用并发去DB查询, 避免串行查询。 
;; 比如: 用户代办信息; 站内信息; 
;;

(defn add-app-info
  "添加应用统计信息。"
  [app]
  (assoc app
         :yesterday_reqs (count-reqs app 7)
         :monthly_reqs (count-reqs app 30)
         :total_users (count-users app)))

(defn get-client-apps
  "获取用户的应用列表"
  [client_id]
  (->> client_id
       (db/find-apps-by-client-id)
       (map add-app-info)))

;; 改造获取用户的应用列表
;; 将add-app-info的串行执行改为并行执行
(defn get-client-apps
  "获取用户的应用列表"
  [client_id]
  (->> client_id
       (db/find-apps-by-client-id)
       (pmap add-app-info)))
;;
;; clojure.core
;; (pmap coll)
;; (pmap coll & colls)
(defn pmap
  "Like map, except f is applied in parallel. Semi-lazy in that the
  parallel computation stays ahead of the consumption, but doesn't
  realize the entire result unless required. Only useful for
  computationally intensive functions where the time of f dominates
  the coordination overhead."
  {:added "1.0"
   :static true}
  ([f coll]
   (let [n (+ 2 (.. Runtime getRuntime availableProcessors))
         rets (map #(future (f %)) coll)
         step (fn step [[x & xs :as vs] fs]
                (lazy-seq
                 (if-let [s (seq fs)]
                   (cons (deref x) (step xs (rest s)))
                   (map deref vs))))]
     (step rets (drop n rets))))
  ([f coll & colls]
   (let [step (fn step [cs]
                (lazy-seq
                 (let [ss (map seq cs)]
                   (when (every? identity ss)
                     (cons (map first ss) (step (map rest ss)))))))]
     (pmap #(apply f %) (step (cons coll colls))))))