Matheus Moreira
Matheus Moreira

Reputation: 2388

Proper way to ensure clj-http's connection manager is closed after all requests are done

I have a code that is a combination of clj-http, core.async facilities and an atom. It creates some threads to fetch and parse a bunch of pages:

(defn fetch-page
  ([url] (fetch-page url nil))
  ([url conn-manager]
    (-> (http.client/get url {:connection-manager conn-manager})
        :body hickory/parse hickory/as-hickory)))

(defn- create-worker
  [url-chan result conn-manager]
  (async/thread
    (loop [url (async/<!! url-chan)]
      (when url
        (swap! result assoc url (fetch-page url conn-manager))
        (recur (async/<!! url-chan))))))

(defn fetch-pages
  [urls]
  (let [url-chan (async/to-chan urls)
        pages (atom (reduce (fn [m u] (assoc m u nil)) {} urls))
        conn-manager (http.conn-mgr/make-reusable-conn-manager {})
        workers (mapv (fn [_] (create-worker url-chan pages conn-manager))
                      (range n-cpus))]
    ; wait for workers to finish and shut conn-manager down
    (dotimes [_ n-cpus] (async/alts!! workers))
    (http.conn-mgr/shutdown-manager conn-manager)

    (mapv #(get @pages %) urls)))

The idea is to use multiple threads to reduce the time to fetch and parse the pages, but I'd like to not overload the server, sending a lot of requests at once - that is why a connection manager was used. I don't know if my approach is correct, suggestions are welcome. Currently the problem is that the last requests fail because the connection manager is shutdown before they terminate: Exception in thread "async-thread-macro-15" java.lang.IllegalStateException: Connection pool shut down.

The main questions: how do I close the connection manager at the right moment (and why my current code fails in doing it)? The side quest: is my approach right? If not, what could I do to fetch and parse multiple pages at once, while not overloading the server?

Thanks!

Upvotes: 4

Views: 928

Answers (2)

Josh
Josh

Reputation: 4816

I believe Alejandro is correct about the reason for your error, and this is logical, since your error indicates that you have shut down the connection manager before all requests have been completed, so it's likely that all the workers have not finished when you shut it down.

Another solution I'll propose stems from the fact that you aren't actually doing anything in your create-worker thread that requires it to be a channel, which is implicitly created by async/thread. So, you can replace it with a future, like so:

(defn- create-worker
  [url-chan result conn-manager]
  (future
    (loop [url (a/<!! url-chan)]
      (when url
        (swap! result assoc url (fetch-page url conn-manager))
        (recur (a/<!! url-chan))))))

And in your fetch-pages function, "join" by derefing:

(doseq [worker workers]
  @worker) ; alternatively, use deref to specify timeout 

This eliminates a good deal of core.async interference in what is not a core.async issue to begin with. This of course depends on you keeping your method of collecting the data as-is, that is, using swap! on an atom to keep track of page data. If you were to send the result of fetch-page out onto a return channel, or something similar, then you'd want to keep your current thread approach.

Regarding your concern about overloading the server -- you have not yet defined what it means to "overload" the server. There are two dimensions of this: one is the rate of requests (number of requests per second, for example), and the other is the number of concurrent requests. Your current app has n worker threads, and that is the effective concurrency (along with the settings in the connection manager). But this does nothing to address the rate of requests per second.

This is a little more complicated than it might seem, though it is possible. You have to consider the total of all requests done by all threads per unit of time, and managing that is not something to tackle in one answer here. I suggest you do some research about throttling and rate limiting, and give it a go, and then go from there with questions.

Upvotes: 1

Alejandro C.
Alejandro C.

Reputation: 3801

The problem is that async/alts!! returns on the first result (and will keep doing so since workers never changes). I think using async/merge to build a channel and then repeatedly read off of it should work.

(defn fetch-pages
  [urls]
  (let [url-chan (async/to-chan urls)
        pages (atom (reduce (fn [m u] (assoc m u nil)) {} urls))
        conn-manager (http.conn-mgr/make-reusable-conn-manager {})
        workers (mapv (fn [_] (create-worker url-chan pages conn-manager))
                      (range n-cpus))
        all-workers (async/merge workers)]
    ; wait for workers to finish and shut conn-manager down
    (dotimes [_ n-cpus] (async/<!! all-workers))
    (http.conn-mgr/shutdown-manager conn-manager)

    (mapv #(get @pages %) urls)))

Alternatively, you could recur and keep shrinking workers instead so that you're only waiting on previously unfinished workers.

(defn fetch-pages
  [urls]
  (let [url-chan (async/to-chan urls)
        pages (atom (reduce (fn [m u] (assoc m u nil)) {} urls))
        conn-manager (http.conn-mgr/make-reusable-conn-manager {})
        workers (mapv (fn [_] (create-worker url-chan pages conn-manager))
                      (range n-cpus))]
    ; wait for workers to finish and shut conn-manager down
    (loop [workers workers]
      (when (seq workers)
        (let [[_ finished-worker] (async/alts!! workers)]
          (recur (filterv #(not= finished-worker %) workers)))))

    (http.conn-mgr/shutdown-manager conn-manager)    
    (mapv #(get @pages %) urls)))

Upvotes: 1

Related Questions