首页 文章

使用Clojure core.async限制进程

提问于
浏览
3

我试图使用clojure core.async通道来节省内存密集型并发进程 . 每个进程将图像加载到内存中并应用水印 . 如果我尝试同时处理太多图像,我会收到OOM错误 .

下面的模式似乎有效,但感觉有点不雅 . 我的问题是,使用core.async有更好的方法吗?或者,我应该只使用java并发的东西来做这件事(即创建一个固定大小的线程池等) .

下面的代码中的基本概念是使用全局固定大小的通道, tchan 用于限制进入 in-chan 的内容,基本上将并发进程的数量限制为 tchan 的大小 .

在下面的代码中, process-images 是入口点 .

(def tbuff (buffer 20))

(def tchan
  "tchan is used to throttle the number of processes
  tbuff is a fixed size buffer"
  (chan tbuff))

(defn accum-results
  "Accumulates the images in results-chan"
  [n result-chan]
  (let [chans [result-chan (timeout timeout-ms)]]
    (loop [imgs-out  []
           remaining n]
      (if (zero? remaining)
        imgs-out
        (let [[img-result _] (alts!! chans)]
          (if (nil? img-result)
            (do
              (log/warn "Image processing timed out")
              (go (dotimes [_ remaining] (<! tchan)))
              imgs-out)
            (do
              (go (<! tchan))
              (recur (conj imgs-out img-result) (dec remaining)))))))))

(defn process-images
  "Concurrently watermarks a list of images
  Images is a sequence of maps representing image info
  Concurrently fetches each actual image and applies the watermark
  Returns a map of image info map -> image input stream"
  [images]
  (let [num-imgs (count images)
        in-chan  (chan num-imgs)
        out-chan (chan num-imgs)]
    ;; set up the image-map consumer
    ;; asynchronously process things found on in-chan
    (go
      (dotimes [_ num-imgs]
        ; block here on input images
        (let [img-in (<! in-chan)]
          (thread
            (let [img-out (watermark/watermarked-image-is img-in)]
              (>!! out-chan [img-in img-out]))))))
    ;; put images on in-chan
    (go
      (doseq [img images]
        (>! tchan :x)
        (>! in-chan img)))
    ;; accum results
    (let [results (accum-results num-imgs out-chan)]
      (log/info (format "Processed %s of %s images and tbuff is %s"
                        (count results) num-imgs (count tbuff)))
      (into {} results))))

1 回答

  • 2

    我相信这正是pipeline的用途 .

    这是一个例子:

    user> (require '[clojure.core.async :refer [<! <!! chan go go-loop pipeline pipeline-blocking pipeline-async] :as async])
    
    user> (let [output (chan)
                input (async/to-chan (range 10))]
            (go-loop [x (<! output)]
              (println x))
            (pipeline 4
                      output
                      (map #(do
                              (Thread/sleep (rand-int 200))
                              (println "starting" %)
                              (Thread/sleep 1000)
                              (println "finished" %)
                              (inc %)))
                      input))
    #object[clojure.core.async.impl.channels.ManyToManyChannel 0x3f434b5a "clojure.core.async.impl.channels.ManyToManyChannel@3f434b5a"]
    user> starting 0
    starting 3
    starting 1
    starting 2
    finished 0
    1
    finished 3
    finished 1
    finished 2
    starting 4
    starting 5
    starting 6
    finished 4
    finished 5
    finished 6
    

相关问题