future 用法 (含有CountDownLatch) core.async alts!
;;
;; core.async的alts! 实现部分 不是用Promise,有reduce,还看不明白.
;;
(defn do-alt [alts clauses]
(assert (even? (count clauses)) "unbalanced clauses")
(let [clauses (core/partition 2 clauses)
opt? #(keyword? (first %))
opts (filter opt? clauses)
clauses (remove opt? clauses)
[clauses bindings]
(core/reduce
(fn [[clauses bindings] [ports expr]]
(let [ports (if (vector? ports) ports [ports])
[ports bindings]
(core/reduce
(fn [[ports bindings] port]
(if (vector? port)
(let [[port val] port
gp (gensym)
gv (gensym)]
[(conj ports [gp gv]) (conj bindings [gp port] [gv val])])
(let [gp (gensym)]
[(conj ports gp) (conj bindings [gp port])])))
[[] bindings] ports)]
[(conj clauses [ports expr]) bindings]))
[[] []] clauses)
gch (gensym "ch")
gret (gensym "ret")]
`(let [~@(mapcat identity bindings)
[val# ~gch :as ~gret] (~alts [~@(apply concat (core/map first clauses))] ~@(apply concat opts))]
(cond
~@(mapcat (fn [[ports expr]]
[`(or ~@(core/map (fn [port]
`(= ~gch ~(if (vector? port) (first port) port)))
ports))
(if (and (seq? expr) (vector? (first expr)))
`(let [~(first expr) ~gret] ~@(rest expr))
expr)])
clauses)
(= ~gch :default) val#))))
;;
;; mq push和 pull操作使用future
;;
(testing "push / pull"
(mq/with-context context 2
(with-open [s0 (-> context
(mq/socket mq/pull)
(mq/bind url))
s1 (-> context
(mq/socket mq/push)
(mq/connect url))]
(let [msg (uuid)
push (future (mq/send s1 msg))
pull (future (mq/recv s0))]
(is (= msg (String. @pull)))))))
;;
;; from Dennis http://blog.fnil.net/blog/a51d8166c5b6444cb0bcb303704a9faf/
;;
;; 有一段业务代码要调用一个第三方接口来查询域名备案号,但是呢,这个第三方接口非常
;; 不稳定,经常查询出错或者超时,导致这个业务经常不可用。
(defn query-icp [domain]
;; HTTP 调用第三方接口 API 。
(query-icp-from-thirdparty1 domain))
;; 为了提高这个接口的稳定性,我们引入另一个查询服务,想让两个服务来竞争,
;; 谁能返回正常结果,就用谁的。假设这个服务封装成了函数 query-icp-from-thirdparty2
(defn query-icp [domain]
(or
(query-icp-from-thirdparty1 domain)
(query-icp-from-thirdparty2 domain)))
;; 最终版本:
;; 借助 future和Deliver Promise
(defn- do-query-icp [p f domain]
(future
(when-let [ret (f domain)]
;; 两个线程公用一个Promise, 谁先完成谁就deliver消息即可
;; 和core.async的alts! 类似, 不知到async是不是就是用的Promise? (update: 没有关系,见下一个文件async的代码)
(deliver p ret))))
(defn query-icp [domain]
(let [p (promise)]
(do-query-icp p query-icp-from-thirdparty1 domain)
(do-query-icp p query-icp-from-thirdparty2 domain)
(deref p :5000 nil)))
(defn sh
{:added "1.2"}
[& args]
(let [[cmd opts] (parse-args args)
proc (.exec (Runtime/getRuntime)
^"[Ljava.lang.String;" (into-array cmd)
(as-env-strings (:env opts))
(as-file (:dir opts)))
{:keys [in in-enc out-enc]} opts]
(if in
(future
(with-open [os (.getOutputStream proc)]
(copy in os :encoding in-enc)))
(.close (.getOutputStream proc)))
(with-open [stdout (.getInputStream proc)
stderr (.getErrorStream proc)]
(let [out (future (stream-to-enc stdout out-enc))
err (future (stream-to-string stderr))
exit-code (.waitFor proc)]
{:exit exit-code :out @out :err @err}))))
;;
;; korma.test
;; 这里有用到 CountDownLatch
(deftest thread-safe-with-db
(testing "with-db using binding"
(let [db1 (mem-db)
db2 (mem-db-2)]
;; let's create some records in the first db
(with-db db1 (populate 2))
;; let's create some records in the second db
(with-db db2 (populate 2))
(default-connection db1)
;; we will create 2 threads and execute them at the same time
;; using a CountDownLatch to synchronize their execution
;; countDown 里主要3个方法:
;; 1. (CountDownLatch. 2) 初始化数量
;; 2. (.countDown latch) 准备就绪
;; 3. (.await latch) 等待他人
(let [latch (CountDownLatch. 2)
t1 (future (with-db db2
(.countDown latch)
(.await latch)
(delete-some-rows-from-db)))
t2 (future (with-db db1
(.countDown latch)
(.await latch)
(delete-some-rows-from-db)))]
@t1
@t2
;; 主线程在@t1 @t2后面表示等待t1和t2执行完毕才算结束
(.await latch))
(default-connection nil)
(let [addresses-mem-db-1 (with-db db1
(select address))
addresses-mem-db-2 (with-db db2
(select address))]
(is (= (count addresses-mem-db-1) 0))
(is (= (count addresses-mem-db-2) 0))))))
;;
;; from wile.core
;;
(defn executor [] (Executors/newCachedThreadPool))
(defn notify
"Invokes a function for it's side effects & ignore it's results."
[f & more]
{:pre [(ifn? f)]}
(io!
(try (apply f more)
(catch Throwable t
(log/error t "Exception Encountered Notifying Spy")
nil)
(finally nil))))
(defn notify-all
"Invokes a coll of fns for their side effects & ignore it's results. Immediately
returns a future."
[fns & more]
{:pre [(every? ifn? fns)]}
(future (doseq [f fns] (apply notify f more))))
;;
;; from: http kit
;;
;; add from Jetbrains
(deftest test-immediate-close-kills-inflight-requests
(let [server (run-server (slow-request-handler 2000) {:port 3474})
resp (future (try (http/get "http://localhost:3474")
(catch Exception e {:status "fail"})))]
(Thread/sleep 100)
(server)
(is (= "fail" (:status @resp)))))