3/28/2018 - 3:23 PM

future 用法 (含有CountDownLatch) core.async alts!

future 用法 (含有CountDownLatch) core.async alts!

;; core.async的alts! 实现部分 不是用Promise,有reduce,还看不明白. 
(defn do-alt [alts clauses]
  (assert (even? (count clauses)) "unbalanced clauses")
  (let [clauses (core/partition 2 clauses)
        opt? #(keyword? (first %))
        opts (filter opt? clauses)
        clauses (remove opt? clauses)
        [clauses bindings]
         (fn [[clauses bindings] [ports expr]]
           (let [ports (if (vector? ports) ports [ports])
                 [ports bindings]
                  (fn [[ports bindings] port]
                    (if (vector? port)
                      (let [[port val] port
                            gp (gensym)
                            gv (gensym)]
                        [(conj ports [gp gv]) (conj bindings [gp port] [gv val])])
                      (let [gp (gensym)]
                        [(conj ports gp) (conj bindings [gp port])])))
                  [[] bindings] ports)]
             [(conj clauses [ports expr]) bindings]))
         [[] []] clauses)
        gch (gensym "ch")
        gret (gensym "ret")]
    `(let [~@(mapcat identity bindings)
           [val# ~gch :as ~gret] (~alts [~@(apply concat (core/map first clauses))] ~@(apply concat opts))]
        ~@(mapcat (fn [[ports expr]]
                    [`(or ~@(core/map (fn [port]
                                   `(= ~gch ~(if (vector? port) (first port) port)))
                     (if (and (seq? expr) (vector? (first expr)))
                       `(let [~(first expr) ~gret] ~@(rest expr))
        (= ~gch :default) val#))))
;; mq push和 pull操作使用future

(testing "push / pull"
        (mq/with-context context 2
          (with-open [s0 (-> context
                             (mq/socket mq/pull)
                             (mq/bind url))
                      s1 (-> context
                             (mq/socket mq/push)
                             (mq/connect url))]
            (let [msg (uuid)
                  push (future (mq/send s1 msg))
                  pull (future (mq/recv s0))]
              (is (= msg (String. @pull)))))))
;; from Dennis
;; 有一段业务代码要调用一个第三方接口来查询域名备案号,但是呢,这个第三方接口非常
;; 不稳定,经常查询出错或者超时,导致这个业务经常不可用。

(defn query-icp [domain]
     ;; HTTP 调用第三方接口 API 。
     (query-icp-from-thirdparty1 domain))

;; 为了提高这个接口的稳定性,我们引入另一个查询服务,想让两个服务来竞争,
;; 谁能返回正常结果,就用谁的。假设这个服务封装成了函数 query-icp-from-thirdparty2  
(defn query-icp [domain]
  (query-icp-from-thirdparty1 domain)
  (query-icp-from-thirdparty2 domain)))
;; 最终版本:
;; 借助 future和Deliver Promise
(defn- do-query-icp [p f domain]
    (when-let [ret (f domain)]
      ;; 两个线程公用一个Promise, 谁先完成谁就deliver消息即可
      ;; 和core.async的alts! 类似, 不知到async是不是就是用的Promise? (update: 没有关系,见下一个文件async的代码)
      (deliver p ret))))

(defn query-icp [domain]
  (let [p (promise)]
    (do-query-icp p query-icp-from-thirdparty1 domain)
    (do-query-icp p query-icp-from-thirdparty2 domain)
    (deref p :5000 nil)))
(defn sh
  {:added "1.2"}
  [& args]
  (let [[cmd opts] (parse-args args)
        proc (.exec (Runtime/getRuntime) 
               ^"[Ljava.lang.String;" (into-array cmd)
               (as-env-strings (:env opts))
               (as-file (:dir opts)))
        {:keys [in in-enc out-enc]} opts]
    (if in
        (with-open [os (.getOutputStream proc)]
          (copy in os :encoding in-enc)))
      (.close (.getOutputStream proc)))
    (with-open [stdout (.getInputStream proc)
                stderr (.getErrorStream proc)]
      (let [out (future (stream-to-enc stdout out-enc))
            err (future (stream-to-string stderr))
            exit-code (.waitFor proc)]
        {:exit exit-code :out @out :err @err}))))
;; korma.test
;; 这里有用到 CountDownLatch

(deftest thread-safe-with-db
  (testing "with-db using binding"
    (let [db1 (mem-db)
          db2 (mem-db-2)]
      ;; let's create some records in the first db
      (with-db db1 (populate 2))

      ;; let's create some records in the second db
      (with-db db2 (populate 2))

      (default-connection db1)

      ;; we will create 2 threads and execute them at the same time
      ;; using a CountDownLatch to synchronize their execution
      ;; countDown 里主要3个方法: 
      ;; 1. (CountDownLatch. 2) 初始化数量
      ;; 2. (.countDown latch) 准备就绪
      ;; 3. (.await latch) 等待他人
      (let [latch (CountDownLatch. 2)
            t1 (future (with-db db2
                         (.countDown latch)
                         (.await latch)
            t2 (future (with-db db1
                         (.countDown latch)
                         (.await latch)
        ;; 主线程在@t1 @t2后面表示等待t1和t2执行完毕才算结束
        (.await latch))

      (default-connection nil)

      (let [addresses-mem-db-1 (with-db db1
                                 (select address))
            addresses-mem-db-2 (with-db db2
                                 (select address))]
        (is (= (count addresses-mem-db-1) 0))
        (is (= (count addresses-mem-db-2) 0))))))
;; from wile.core
(defn executor [] (Executors/newCachedThreadPool))

(defn notify
  "Invokes a function for it's side effects & ignore it's results."
  [f & more]
  {:pre [(ifn? f)]}
   (try (apply f more)
        (catch Throwable t
          (log/error t "Exception Encountered Notifying Spy")
        (finally nil))))

(defn notify-all
  "Invokes a coll of fns for their side effects & ignore it's results. Immediately
  returns a future."
  [fns & more]
  {:pre [(every? ifn? fns)]}
  (future (doseq [f fns] (apply notify f more))))
;; from: http kit
;; add from Jetbrains

(deftest test-immediate-close-kills-inflight-requests
  (let [server (run-server (slow-request-handler 2000) {:port 3474})
        resp (future (try (http/get "http://localhost:3474")
                          (catch Exception e {:status "fail"})))]
    (Thread/sleep 100)
    (is (= "fail" (:status @resp)))))