Micah
Micah

Reputation: 10395

clojure core.async thread blocks

I have the following code:

(defn -db-producer-factory [order-ids-chan next-chan]
  (thread
    (prn "db starting...")
    (while true
      (do
        (prn "db starting2...")
        ;;
        ;; issue spot!
        ;; it stays blocked here-- the orderid doesnt come off the chan
        ;;
        (let [order-id (<!! order-ids-chan)]
          (prn "db->" order-id)
          (condp = order-id
            :finished (>!! next-chan :finished)
            :>> (supress-w-nextexc
                  (->>
                    ; get denorm'd order
                    (-> (r/-get-order :live order-id)
                        denorm/order->denormalized)
                    ; put in a map to avoid nils
                    (hash-map :data)
                    (>!! next-chan)))))))))

(defn -stats-producer-factory [stats-db-chan next-chan]
  (thread
    (while true
      (do
        (let [msg (<!! stats-db-chan)
              data (:data msg)]
          (when data
            (do
              (prn "stats-> " (-> data :order :order-id))
              (supress-w-nextexc
                (q/stats-order-insert (-> data :order)))
              (supress-w-nextexc
                (q/stats-item-insert (-> data :items)))))
          (>!! next-chan msg))))))

(defn -do-orderids [orderids]
  (let [finished-chan (chan)
        order-ids-chan (chan)
        stats-db-chan (chan)
        db-producer (-db-producer-factory order-ids-chan stats-db-chan)
        stats-producer (-stats-producer-factory stats-db-chan finished-chan)]

    (prn "pre-pub")
    ;; pub ids and finished message
    (map #(>!! order-ids-chan %) (conj orderids :finished))
    ;; wait for finish
    (prn "finished? " (<!! finished-chan))
    ;; allow time for finishing
    ;(Thread/sleep 3000)
    ;; close all chans
    (map close! [finished-chan order-ids-chan stats-db-chan db-producer stats-producer])
    ))

The process is initiated via a call to -do-orderids like (-do-orderids [123]).

The output of the execution is:

"db starting..."
"pre-pub"
"db starting2..."

But then it blocks. Why doesn't it pass the orderid at the "issue spot"?

Upvotes: 1

Views: 214

Answers (1)

glts
glts

Reputation: 22734

Your program blocks because the db-producer, who’s blocked waiting for an order ID, never actually receives an order ID on order-ids-chan.

That is because map is lazy. So, in this invocation

(map #(>!! order-ids-chan %) (conj orderids :finished))

the mapping function is never called and no order ID is ever put onto the channel.

Clojure rule of thumb:

Never use map for side-effects! Use run! instead.

I think substituting run! for map in that line (and on the last line where you call close! on the channel) should fix it.

Unrelated: all the do forms you used are redundant, you can safely remove them and reduce the level of nesting.

Upvotes: 3

Related Questions