Anton Harald
Anton Harald

Reputation: 5944

Core.async: Take all values from collection of promise-chans

Consider a dataset like this:

(def data [{:url "http://www.url1.com" :type :a}
           {:url "http://www.url2.com" :type :a}
           {:url "http://www.url3.com" :type :a}
           {:url "http://www.url4.com" :type :b}])

The contents of those URL's should be requested in parallel. Depending on the item's :type value those contents should be parsed by corresponding functions. The parsing functions return collections, which should be concatenated, once all the responses have arrived.

So let's assume that there are functions parse-a and parse-b, which both return a collection of strings when they are passed a string containing HTML content.

It looks like core.async could be a good tool for this. One could either have separate channels for each item ore one single channel. I'm not sure which way would be preferable here. With several channels one could use transducers for the postprocessing/parsing. There is also a special promise-chan which might be proper here.

Here is a code-sketch, I'm using a callback based HTTP kit function. Unfortunately, I could not find a generic solution inside the go block.

(defn f [data] 
  (let [chans (map (fn [{:keys [url type]}] 
                     (let [c (promise-chan (map ({:a parse-a :b parse-b} type)))] 
                       (http/get url {} #(put! c %))
                       c))
                   data)
        result-c (promise-chan)] 
    (go (put! result-c (concat (<! (nth chans 0))
                               (<! (nth chans 1))
                               (<! (nth chans 2))
                               (<! (nth chans 3)))))
    result-c))

The result can be read like so:

(go (prn (<! (f data))))

Upvotes: 3

Views: 1976

Answers (4)

danieltan95
danieltan95

Reputation: 860

if anyone is still looking at this, adding on to the answer by @OlegTheCat:

You can use a separate channel for errors.

(:require [cljs.core.async :as async]
            [cljs-http.client :as http])
(:require-macros [cljs.core.async.macros :refer [go]])

(go (as-> [(http/post <url1> <params1>)
           (http/post <url2> <params2>)
           ...]
          chans
          (async/merge chans (count chans))
          (async/reduce conj [] chans)
          (async/<! chans)
          (<callback> chans)))

Upvotes: 0

Dehli
Dehli

Reputation: 5960

I wanted this functionality as well because I really like core.async but I also wanted to use it in certain places like traditional JavaScript promises. I came up with a solution using macros. In the code below, <? is the same thing as <! but it throws if there's an error. It behaves like Promise.all() in that it returns a vector of all the returned values from the channels if they all are successful; otherwise it will return the first error (since <? will cause it to throw that value).

(defmacro <<? [chans]
  `(let [res# (atom [])]
     (doseq [c# ~chans]
       (swap! res# conj (serverless.core.async/<? c#)))
     @res#))

If you'd like to see the full context of the function it's located on GitHub. It's heavily inspired from David Nolen's blog post.

Upvotes: 1

Terje Norderhaug
Terje Norderhaug

Reputation: 3689

Use pipeline-async in async.core to launch asynchronous operations like http/get concurrently while delivering the result in the same order as the input:

(let [result (chan)] 
  (pipeline-async 
    20 result
    (fn [{:keys [url type]} ch]
      (let [parse ({:a parse-a :b parse-b} type)
            callback #(put! ch (parse %)(partial close! ch))]  
        (http/get url {} callback)))
    (to-chan data))
  result)

Upvotes: 0

OlegTheCat
OlegTheCat

Reputation: 4513

I'd say that promise-chan does more harm than good here. The problem is that most of core.async API (a/merge, a/reduce etc.) relies on fact that channels will close at some point, promise-chans in turn never close.

So, if sticking with core.async is crucial for you, the better solution will be not to use promise-chan, but ordinary channel instead, which will be closed after first put!:

...
(let [c (chan 1 (map ({:a parse-a :b parse-b} type)))]
  (http/get url {} #(do (put! c %) (close! c)))
  c)
...

At this point, you're working with closed channels and things become a bit simpler. To collect all values you could do something like this:

;; (go (put! result-c (concat (<! (nth chans 0))
;;                            (<! (nth chans 1))
;;                            (<! (nth chans 2))
;;                            (<! (nth chans 3)))))
;; instead of above, now you can do this:
(->> chans
     async/merge
     (async/reduce into []))

UPD (below are my personal opinions):

Seems, that using core.async channels as promises (either in form of promise-chan or channel that closes after single put!) is not the best approach. When things grow, it turns out that core.async API overall is (you may have noticed that) not that pleasant as it could be. Also there are several unsupported constructs, that may force you to write less idiomatic code than it could be. In addition, there is no built-in error handling (if error occurs within go-block, go-block will silently return nil) and to address this you'll need to come up with something of your own (reinvent the wheel). Therefore, if you need promises, I'd recommend to use specific library for that, for example manifold or promesa.

Upvotes: 4

Related Questions