Reputation: 1516
I'm trying to create a proxy for ArrayBlockingQueue
that intercepts calls to it for monitoring
(ns clj-super-bug.core
(:import [java.util.concurrent ArrayBlockingQueue Executors]))
(let [thread-count 10
put-count 100
executor (Executors/newFixedThreadPool thread-count)
puts (atom 0)
queue (proxy [ArrayBlockingQueue] [1000]
(put [el]
(proxy-super put el)
(swap! puts inc)))]
(.invokeAll executor (repeat put-count #(.put queue 0)))
(assert (= (.size queue) put-count) "should have put in put-count items")
(println @puts))
I would expect this code to always print 100
, but occaissonally it's something else like 51
. Am I using proxy
or proxy-super
wrong?
I debugged this to the point that it seems that the proxy method is not actually called on some occasions, just the base method (the items show up in the queue, as indicated by the assert). Also, I suppose it's multithreading related because if I have thread-count = 1
it's always 100
.
Upvotes: 1
Views: 149
Reputation: 1516
Turns out this is a known issue with proxy-super
: https://dev.clojure.org/jira/browse/CLJ-2201
"If you have a proxy with method M, which invokes proxy-super, then while that proxy-super is running all calls to M on that proxy object will immediately invoke the super M not the proxied M." That's exactly what's happening.
Upvotes: 2
Reputation: 29958
I would not do the subclass via proxy
.
If you subclass ArrayBlockingQueue, you are saying your code is an instance of ABQ. So, you are making a specialized version of ABQ, and must take responsibility for all of the implementation details of the ABQ source code.
However, you don't need to be an instance of ABQ. All you really need is to use an instance of ABQ, which is easily done by composition.
So, we write a wrapper function which delegates to an ABQ:
(ns tst.demo.core
(:use demo.core tupelo.core tupelo.test)
(:require
[clojure.string :as str]
[clojure.java.io :as io])
(:import [java.util.concurrent ArrayBlockingQueue Executors TimeUnit]) )
(dotest
(let [N 100
puts-done (atom 0)
abq (ArrayBlockingQueue. (+ 3 N))
putter (fn []
(.put abq 0)
(swap! puts-done inc))]
(dotimes [_ N]
(future (putter)))
(Thread/sleep 1000)
(println (format "N: %d puts-done: %d" N @puts-done))
(assert (= N @puts-done)
(format "should have put in puts-done items; N = %d puts-done = %d" N @puts-done))
))
result:
N: 100 puts-done: 100
Using the executor:
(dotest
(let [N 100
puts-done (atom 0)
thread-count 10
executor (Executors/newFixedThreadPool thread-count)
abq (ArrayBlockingQueue. (+ 3 N))
putter (fn []
(.put abq 0)
(swap! puts-done inc))
putters (repeat N #(putter)) ]
(.invokeAll executor putters)
(println (format "N: %d puts-done: %d" N @puts-done))
(assert (= N @puts-done)
(format "should have put in puts-done items; N = %d puts-done = %d" N @puts-done))))
result:
N: 100 puts-done: 100
Regarding the cause, I'm not sure. I tried to fix the original version with locking
, but no joy:
(def lock-obj (Object.))
(dotest
(let [N 100
puts-done (atom 0)
thread-count 10
executor (Executors/newFixedThreadPool thread-count)
abq (proxy [ArrayBlockingQueue]
[(+ 3 N)]
(put [el]
(locking lock-obj
(proxy-super put el)
(swap! puts-done inc))))]
(.invokeAll executor (repeat N #(.put abq 0)))
with results:
N: 100 puts-done: 46
N: 100 puts-done: 71
N: 100 puts-done: 85
N: 100 puts-done: 83
Tried some more tests using a java subclass of ABQ:
package demo;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class Que<E> extends ArrayBlockingQueue<E> {
public static AtomicInteger numPuts = new AtomicInteger(0);
public static Que<Integer> queInt = new Que<>( 999 );
public Que(int size) { super(size); }
public void put(E element) {
synchronized (numPuts) {
try {
super.put(element);
numPuts.getAndIncrement();
} catch (Exception ex) {
System.out.println( "caught " + ex);
} } } }
...
(:import [java.util.concurrent Executors TimeUnit]
[demo Que] ) )
(dotest
(let [N 100
puts-done (atom 0)
thread-count 10
executor (Executors/newFixedThreadPool thread-count) ]
(.invokeAll executor (repeat N #(.put Que/queInt 0)))
(println (format "N: %d puts-done: %d" N (.get Que/numPuts)))))
results (repeated runs => accumulation):
N: 100 puts-done: 100
N: 100 puts-done: 200
N: 100 puts-done: 300
N: 100 puts-done: 400
N: 100 puts-done: 500
so it works great with a Java subclass. Get same results with/without the synchronized
block.
So, it looks to be something in the Clojure proxy
area.
Upvotes: 1