Reputation: 333
I have a requirement for a function that when called with particular input args executes a supplied function g, but only after another supplied function f has finished executing with the same input args. There is also a requirement that when the function is called multiple times with the same input args, f is only executed once on the first call, and the other calls wait for this to complete, then execute g directly.
Edit: The solution should work when run in parallel on different threads, and should also use threads efficiently. E.g. blocking should be on a per input basis rather than the whole function.
My first attempt at the function is as follows:
(defn dependent-func
([f g]
(let [mem (atom {})]
(fn [& args]
(->> (get (locking mem
(swap! mem (fn [latch-map args]
(if (contains? latch-map args)
latch-map
(let [new-latch (CountDownLatch. 1)
new-latch-map (assoc latch-map args new-latch)]
(->> (Thread. #(do (apply f args)
(.countDown new-latch)))
(.start))
new-latch-map))) args)) args)
(.await))
(apply g args)))))
This appears to meet my requirements, and awaits on f are on a per input basis, so I'm relatively happy with that. Initially I had hoped to just use swap! to do the mem updating but unfortunately swap! explicitly states that the function in the swap! could be called multiple times (I have seen this in testing). As a result of this I ended up having to lock on mem when updating which is really ugly.
I am sure there must be a cleaner way of doing this that leverages Closure's concurrency mechanisms better than I have, but so far I've been unable to find it.
Any advice would be greatly appreciated.
Thanks,
Matt.
Upvotes: 4
Views: 1951
Reputation: 29958
Something like this is a much simpler answer to your question:
(defn waiter
[f g & args]
(let [f-result (f args)
g-result (g args) ]
(println (format "waiter: f-result=%d g-result=%d" f-result g-result))))
(defn my-f
[args]
(let [result (apply + args)]
(println "my-f running:" result)
result))
; change your orig prob a bit, and define/use my-f-memo instead of the original my-f
(def my-f-memo (memoize my-f))
(defn my-g
[args]
(let [result (apply * args)]
(println "my-g running:" result)
result))
(waiter my-f-memo my-g 2 3 4)
(waiter my-f-memo my-g 2 3 4)
> lein run
my-f running: 9
my-g running: 24
waiter: f-result=9 g-result=24
my-g running: 24
waiter: f-result=9 g-result=24
main - enter
If you change the problem statement a bit and pass in a memoized version of your first function f
, the solution is much easier.
Just calling the functions in sequence in a (let [...]...) form enforces the completion of the first before the execution of the 2nd function.
Also, you could force the waiter
function to do the memoization of f
for you, but it would be a bit more work to manually simulate what memoize
already does.
The original problem didn't explicitly imply it needed to work in a concurrent environment. If multiple threads are an issue, just change the definition of waiter to be:
(defn waiter
[f g & args]
(let [f-result (locking f (f args))
g-result (g args) ]
(println (format "waiter: f-result=%d g-result=%d" f-result g-result))))
Upvotes: 1
Reputation: 91857
There is little purpose to starting up a thread to run f
on, if the very next thing you will do is wait for that thread to complete. You might as well just run f
on the current thread. In that case, your problem decomposes nicely into two subproblems:
f
without risking concurrent execution like the standard memoize does.g
.Let's solve these in reverse order, by first assuming (my-memoize f)
works as you need it to, and then later writing it:
(defn dependent-func [f g]
(let [f' (my-memoize f)]
(fn [& args]
(apply f' args)
(apply g args))))
Very simple with a competent memoize, right? Now, to implement memoize there are a few things you can do. You could use locking, as you did, and I think that's pretty reasonable, since you explicitly want to prevent concurrent execution; once you throw out the thread-launching business it is very easy as well:
(defn my-memoize [f]
(let [memo (atom {})]
(fn [& args]
(locking memo
(if (contains? @memo args)
(get @memo args)
(get (swap! memo assoc args (apply f args))))))))
Or you can reinvent locking yourself, by storing a delay in the atom and then having each call dereference it instead:
(defn my-memoize [f]
(let [memo (atom {})]
(fn [& args]
(-> memo
(swap! update-in [args]
(fn [v]
(or v (delay (apply f args)))))
(get args)
(deref)))))
It's readable and "clever", because it does everything in a swap!
, and I felt quite smug back when I figured this out the first time, but later I realized that this is just hijacking the mutex in Delay.deref()
to accomplish locking, so honestly I think you might as well just use locking
to make it clear there is a lock.
Upvotes: 1
Reputation: 91534
Clojure's combination of future
, promise
, and deliver
is well suited to starting a process and have several threads wait for it to finish.
Future is used to start a thread in the background (it can do more, though in this example I didn't need it to)
Promise is used to immediately return an object that will contain the answer once it is ready.
Deliver is used to supply the promised answer once it is ready.
I'll also split the waiting part into it's own function to make the code easier to follow, and so I can use the built in memoize function:
This question is a very good example of when to use promise and deliver rather than simply a future.
Because we are going to use memoize where it's not safe to run the function twice, we need to be careful that the two calls don't enter memoize at exactly the same time. so we are going to lock only the moment we enter memoize, not the duration of the memoized function.
hello.core> (def lock [])
#'hello.core/lock
this function will always return the same future Object for every time f is called with a given set of arguments, except we need to make memoize safe by wrapping this in a function that does the locking (you could also use an agent for this)
hello.core> (def wait-for-function-helper
(memoize (fn [f args]
(let [answer (promise)]
(println "waiting for function " f " with args" args)
(future (deliver answer (apply f args)))
answer))))
#'hello.core/wait-for-function-helper
hello.core> (defn wait-for-function [& args]
(locking lock
(apply wait-for-function-helper args)))
#'hello.core/wait-for-function
and now we write the actual dependent-func function that uses the safely memoized, future producing, wait-for-function function.
hello.core> (defn dependent-func [f g & args]
@(wait-for-function f args)
(apply g args))
#'hello.core/dependent-func
and define a slow opperation to see it in action:
hello.core> (defn slow-f-1 [x]
(println "starting slow-f-1")
(Thread/sleep 10000)
(println "finishing slow-f-1")
(dec x))
#'hello.core/slow-f-1
and to test it we want to start two of the same function at exactly the same time.
hello.core> (do (future
(println "first" (dependent-func slow-f-1 inc 4)))
(future
(println "second" (dependent-func slow-f-1 inc 4))))
waiting for function
#object[clojure.core$future_call$reify__6736 0x40534083 {:status :pending, :val nil}] with args (4)
#object[hello.core$slow_f_1 0x4f9b3396 hello.core$slow_f_1@4f9b3396]
starting slow-f-1
finishing slow-f-1
second
first
5
5
and if we call it again we see that slow-f-1 only ever ran once:
hello.core> (do (future
(println "first" (dependent-func slow-f-1 inc 4)))
(future
(println "second" (dependent-func slow-f-1 inc 4))))
#object[clojure.core$future_call$reify__6736 0x3935ea29 {:status :pending, :val nil}]
first 5
second 5
Upvotes: 6