4/6/2011 - 6:13 PM


(ns gist.globhfs
  (:import [cascading.tap GlobHfs]))

;; ### Bucket to Cluster
;;; To get tuples back out of our directory structure on S3, we employ
;; Cascading's [GlobHFS] ( tap, along with an
;; interface tailored for datasets stored in the MODIS sinusoidal
;; projection. For details on the globbing syntax, see
;; [here](

(defn globstring
  "Takes a path ending in `/` and collections of datasets,
  resolutions, and tiles, and returns a globstring formatted for
  cascading's GlobHFS. (`*` may be substituted in for any argument but

    Example Usage:
    (globstring \"s3://bucket/\" [\"ndvi\" \"evi\"] [\"1000-32\"] *)
    ;=> \"s3://bucket/{ndvi,evi}/{1000-32}/*/*/\"

    (globstring \"s3://bucket/\" * * [\"008006\" \"033011\"])
    ;=> \"s3://bucket/*/*/{008006,033011}/*/\""
  ([basepath datasets resolutions tiles]
     (globstring basepath datasets resolutions tiles *))
  ([basepath datasets resolutions tiles batches]
     (letfn [(wrap [coll]
                   (format "{%s}/"
                           (apply str (interpose "," coll))))
             (bracketize [arg]
                         (if (= * arg) "*/" (wrap arg)))]
       (apply str
              (map bracketize
                   [datasets resolutions tiles batches])))))

(defn globhfs-seqfile
  (GlobHfs. (w/sequence-file Fields/ALL) pattern))

(defn read-test
  "Takes in a path and a number of pieces, and performs a test
  operation on all tuples matching the glob."
  [path & pieces]
  (let [source (globhfs-seqfile (apply globstring path pieces))]
    (?<- (stdout)
         [?dataset ?tilestring ?date ?count]
         (source ?dataset ?s-res ?t-res ?tilestring ?date ?chunkid ?chunk)
         (c/count ?count))))

;; Example usage, for my particular application:
;; (globstring "s3://bucket/" ["ndvi" "evi"] ["1000-32"] *)
;; => "s3://bucket/{ndvi,evi}/{1000-32}/*/*"
;; so, the last method is called with
;; (read-test "s3://bucket/" ["ndvi" "evi"] ["1000-32"] *)