Gakuo
Gakuo

Reputation: 855

How to execute parallel transactions in Clojure

I have a sequence of customers that needs to be processed in parallel. I tried to use a pmap for that. The result is painfully slow, much slower than a sequential implementation. The inner function process-customer has a transaction. Obviously, the pmap launches all the transactions at once and they end up retrying killing performance. What is thee best way to parallelize this?

(defn process-customers [customers]
  (doall 
    (pmap 
      (fn [sub-customers]
        (doseq [customer sub-customers]
          (process-customer customer)))
      (partition-all 10 customers))))

EDIT: The process-customer function involves the below steps. I write the steps for brevity. All the steps are inside a transaction to ensure another parallel transaction does not cause inconsistencies like negative stock.

(defn- process-customer [customer]
  "Process `customer`. Consists of three steps:
  1. Finding all stores in which the requested products are still available.
  2. Sorting the found stores to find the cheapest (for the sum of all products).
  3. Buying the products by updating the `stock`.
)

EDIT 2: The below version of process-customers has the same performance as the parallel process-customers above. The below is obviously sequential.

(defn process-customers [customers]
  "Process `customers` one by one. In this code, this happens sequentially."
  (doseq [customer customers]
    (process-customer customer)))

Upvotes: 0

Views: 345

Answers (1)

rmcv
rmcv

Reputation: 1976

I assume your transaction is locking on the inventory for the full life cycle of process-customer. This will be slow as all customers are racing for the same universe of stores. If you can split the process into two phases: 1) quoting and 2) fulfilling and applies transaction only on (2) then the performance should be much better. Or if you buy into agent programming, you will have transaction boundary automatically defined for you at the message level. Here is one sample you can consider:

(defn get-best-deal
  "Returns the best deal for a given order with given stores (agent)"
  [stores order]
  ;;
  ;; request for quotation from 1000 stores (in parallel)
  ;;
  (doseq [store stores]
    (send store get-quote order))
  ;;
  ;; wait for reply, up to 0.5s
  ;;
  (apply await-for 500 stores)
  ;;
  ;; sort and find the best store
  ;;
  (when-let [best-store (->> stores
                             (filter (fn [store] (get-in @store [:quotes order])))
                             (sort-by (fn [store] (->> (get-in @store [:quotes order])
                                                       vals
                                                       (reduce +))))
                             first)]
    {:best-store best-store
     :invoice-id (do
                   ;; execute the order
                   (send best-store fulfill order)
                   ;; wait for the transaction to complete
                   (await best-store)
                   ;; get an invoice id
                   (get-in @best-store [:invoices order]))}))

and to find best deals from 1,000 stores for 100 orders (Total 289 line items) from 100 products:

(->> orders
       (pmap (partial get-best-deal stores))
       (filter :invoice-id)
       count
       time)
;; => 57
;; "Elapsed time: 312.002328 msecs"

Sample business logic:

(defn get-quote
  "issue a quote by checking inventory"
  [store {:keys [order-items] :as order}]
  (if-let [quote (->> order-items
                   (reduce reduce-inventory
                           {:store store
                            :quote nil})
                   :quote)]
    ;; has inventory to generate a quote
    (assoc-in store [:quotes order] quote)
    ;; no inventory
    (update store :quotes dissoc order)))

(defn fulfill
  "fulfill an order if previuosly quoted"
  [store order]
  (if-let [quote (get-in store [:quotes order])]
    ;; check inventory again and generate invoice
    (let [[invoice inventory'] (check-inventory-and-generate-invoice store order)]
      (cond-> store
        invoice (->
                  ;; register invoice
                  (assoc-in [:invoices order] invoice)
                  ;; invalidate the quote
                  (update :quotes dissoc order)
                  ;; update inventory
                  (assoc :inventory inventory'))))
    ;; not quoted before
    store))


Upvotes: 1

Related Questions