Samuel Urbanowicz
Samuel Urbanowicz

Reputation: 834

Java sharing objects by multiple threads - design pattern needed

I would like to get some advice on a simple multithreading system I am designing.

The idea: The application is capturing frames and displaying them in 1st imageview. These captured frames are also being processed (by MyHandDetectionThread) and then displayed in 2nd imageview.

My solution:

public class VideoManager {
    private volatile BufferLinkedList<InputFrame> mInputFrames;
    private volatile BufferLinkedList<ProcessedFrame> mProcessedFrames;

    private static VideoManager mVideoManagerInstance = new VideoManager();

    private Timer captureTimer;
    private MyVideoCaptureThread myVideoCaptureThread;
    private MyFrameDisplayThread myFrameDisplayThread;
    private MyHandDetectionThread myHandDetectionThread;
    private MyProcessedFrameDisplayThread myProcessedFrameDisplayThread;

    private enum ThreadMessages {
        PROCESS_INPUT_FRAME,
        NEW_INPUT_FRAME,
        NEW_PROCESSED_FRAME_ARRIVED,
        GET_NEW_FRAME
    }

    public static VideoManager getInstance() {
        if (mVideoManagerInstance == null) {
            mVideoManagerInstance = new VideoManager();
        }
        return mVideoManagerInstance;
    }

    // not visible constructor - for singleton purposes
    private VideoManager() {
        mInputFrames = new BufferLinkedList<>(Config.inputFramesListLimit);
        mProcessedFrames = new BufferLinkedList<>(Config.inputFramesListLimit);
    }

    public void startDetectionAndRecognition(ImageView camIV, ImageView handIV) {
        mInputFrames = new BufferLinkedList<>(Config.inputFramesListLimit);
        mProcessedFrames = new BufferLinkedList<>(Config.inputFramesListLimit);

        captureTimer = new Timer();

        myVideoCaptureThread = new MyVideoCaptureThread();
        myFrameDisplayThread = new MyFrameDisplayThread(camIV, handIV);
        myHandDetectionThread = new MyHandDetectionThread();
        myProcessedFrameDisplayThread = new MyProcessedFrameDisplayThread();

        captureTimer.schedule(new TimerTask() {
            public void run() {
                if (myVideoCaptureThread != null && myVideoCaptureThread.threadMessages != null)
                    myVideoCaptureThread.threadMessages.offer(ThreadMessages.GET_NEW_FRAME);
            }
        }, 0, 1000 / Config.fps);
        myFrameDisplayThread.start();
        myVideoCaptureThread.start();
        myHandDetectionThread.start();
        myProcessedFrameDisplayThread.start();
    }

    public void stop() {
        captureTimer.cancel();
        myVideoCaptureThread.interrupt();
        myHandDetectionThread.interrupt();
        myFrameDisplayThread.interrupt();
        myGestureRecogitionThread.interrupt();

        mInputFrames.removeAll(mInputFrames);
        mProcessedFrames.removeAll(mProcessedFrames);

        isActive = false;
    }

    public boolean isActive() {
        return isActive;
    }

    ////////////////////////
    // Thread clases
    ////////////////////////
    private class MyVideoCaptureThread extends Thread {
        LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(128);

        @Override
        public void run() {
            WebCamVideoCapture vc = new WebCamVideoCapture();
            while (!isInterrupted()) {
                if (threadMessages != null && threadMessages.poll() == ThreadMessages.GET_NEW_FRAME) {
                    Mat mat = vc.getNextMatFrame();
                    if (mat != null && mInputFrames != null) {
                        mInputFrames.offerFirst(new InputFrame(mat));

                        if (myFrameDisplayThread != null && myFrameDisplayThread.threadMessages != null)
                            myFrameDisplayThread.threadMessages.offer(ThreadMessages.NEW_INPUT_FRAME);

                        if (myHandDetectionThread != null && myHandDetectionThread.threadMessages != null)
                            myHandDetectionThread.threadMessages.offer(ThreadMessages.PROCESS_INPUT_FRAME);
                    }
                }
            }
            vc.close();
        }
    }

    private class MyFrameDisplayThread extends Thread {
        LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(128);

        ImageView mCamImageView;


        long lastUpdatedCamImageViewMillis;
        long lastUpdatedHandImageViewMillis;

        public MyFrameDisplayThread(ImageView mImageView) {
            this.mCamImageView = mImageView;
        }

        private synchronized void updateImageViews() {
            if (threadMessages.poll() == ThreadMessages.NEW_INPUT_FRAME && mInputFrames != null && !mInputFrames.isEmpty() && mInputFrames.peek() != null && mInputFrames.peek().getFrame() != null) {
                if(Config.IS_DEBUG) System.out.println("Updating frame image view");
                mCamImageView.setImage(Utils.cvMatToImage(mInputFrames.peekFirst().getFrame()));
            } 
        }

        @Override
        public void run() {
            while (!isInterrupted()) {
                updateImageViews();
            }
        }
    }

    private class MyHandDetectionThread extends Thread {
        LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(128); //TODO if multiple threads, define it out of class
        HandDetector hd = new HandDetector();

        @Override
        public void run() {
            while (!isInterrupted()) {
                if (threadMessages.poll() == ThreadMessages.PROCESS_INPUT_FRAME && mInputFrames != null && mInputFrames.size() > 0 && mInputFrames.peek() != null) {
                    if(Config.IS_DEBUG) System.out.println("Detecting hand...");

                    mProcessedFrames.offerFirst(new ProcessedFrame(hd.detectHand(mInputFrames.peek()), null, null, null));

                    if (myGestureRecogitionThread != null && myGestureRecogitionThread.threadMessages != null)
                        myGestureRecogitionThread.threadMessages.offer(ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED);

                    if(myFrameDisplayThread != null && myFrameDisplayThread.threadMessages != null)
                        myFrameDisplayThread.threadMessages.offer(ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED);
                }
            }
        }
    }

    private class MyProcessedFrameDisplayThread extends Thread {
        LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(128);
        ImageView mHandImageView;
        public MyProcessedFrameDisplayThread(ImageView mHandImageView) {
            mHandImageView = mHandImageView;
        }

        private synchronized void updateImageViews() {
            if(threadMessages.poll() == ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED && mProcessedFrames != null && !mProcessedFrames.isEmpty() && mProcessedFrames.peek() != null && mProcessedFrames.peek().getmHandMask() != null) {
                if(Config.IS_DEBUG) System.out.println("Updating hand image view");
                mHandImageView.setImage(Utils.cvMatToImage(mProcessedFrames.peekFirst().getmHandMask()));
            }
        }

        @Override
        public void run() {
            while (!isInterrupted())
                if (threadMessages.poll() == ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED)
                    updateImageViews();
        }

    }
}

public class BufferLinkedList<E> extends LinkedList<E> {
    private int counter = 0;
    private int sizeLimit = 48;

    public BufferLinkedList(int sizeLimit) {
        this.sizeLimit = sizeLimit;
    }

    @Override
    public synchronized boolean offerFirst(E e) {
        while(size() > sizeLimit) {
            removeLast();
        }

        return super.offerFirst(e);
    }

    @Override
    public synchronized E peekFirst() {
        return super.peekFirst();
    }

    @Override
    public synchronized E peekLast() {
        return super.peekLast();
    }

    @Override
    public synchronized E pollFirst() {
        return super.pollFirst();
    }

    @Override
    public synchronized E pollLast() {
        return super.pollLast();
    }
}

My problems: The frames are not displayed smoothly. there are a irregular, 1-5 seconds breaks between methods updating imageviews are fired. However the MyHandDetectionThread's task runs preaty quickly. And sizes of message queues of Display Threads are increasing fast. Maybe this is because of some locks on lists storing the frames?

Question: Is my solution correct? Are there some design patterns describing this scenario? Do you have some suggestions for improvement?

Edit: I've added waiting and notifying in thread loops. Results are satisfying. The CPU cost is now ~30% comparing to ~80% before. Everything runs more stable and smoothier. However, I'm not familiar with waiting and notifying aproach. So let me know if you find something stupid in my code.

public class VideoManager {
    private volatile BufferLinkedList<InputFrame> mInputFrames;
    private volatile BufferLinkedList<ProcessedFrame> mProcessedFrames;

    private static VideoManager mVideoManagerInstance = new VideoManager();

    private Timer captureTimer;
    private MyVideoCaptureThread myVideoCaptureThread;
    private MyFrameDisplayThread myFrameDisplayThread;
    private MyHandDetectionThread myHandDetectionThread;
    private MyGestureRecogitionThread myGestureRecogitionThread;
    private MySkinDisplayThread mySkinDisplayThread;

    private final static int THREAD_MESSAGES_LIMIT = 10000;
    private final static int TIMER_INTERVAL = 1000 / Config.fps;
    private final static int WAITING_TIMEOUT = 2000;

    private enum ThreadMessages {
        PROCESS_INPUT_FRAME,
        NEW_INPUT_FRAME,
        NEW_PROCESSED_FRAME_ARRIVED,
        GET_NEW_FRAME
    }

    public static VideoManager getInstance() {
        if (mVideoManagerInstance == null) {
            mVideoManagerInstance = new VideoManager();
        }
        return mVideoManagerInstance;
    }

    // not visible constructor - for singleton purposes
    private VideoManager() {
        mInputFrames = new BufferLinkedList<>(Config.inputFramesListLimit);
        mProcessedFrames = new BufferLinkedList<>(Config.inputFramesListLimit);
    }

    public void startDetectionAndRecognition(ImageView camIV, ImageView handIV) {
        mInputFrames = new BufferLinkedList<>(Config.inputFramesListLimit);
        mProcessedFrames = new BufferLinkedList<>(Config.inputFramesListLimit);

        captureTimer = new Timer();
        myFrameDisplayThread = new MyFrameDisplayThread(camIV);
        myVideoCaptureThread = new MyVideoCaptureThread();
        myHandDetectionThread = new MyHandDetectionThread();
        myGestureRecogitionThread = new MyGestureRecogitionThread();
        mySkinDisplayThread = new MySkinDisplayThread(handIV);

        myVideoCaptureThread.start();
        captureTimer.schedule(new TimerTask() {
            public void run() {
                if (myVideoCaptureThread != null && myVideoCaptureThread.threadMessages != null) {
                    myVideoCaptureThread.threadMessages.offer(ThreadMessages.GET_NEW_FRAME);
                    System.out.println("Timer get frame request sent");
                    myVideoCaptureThread.wakeUp();
                }
            }
        }, 0, TIMER_INTERVAL);
        myFrameDisplayThread.start();
        mySkinDisplayThread.start();
        myHandDetectionThread.start();
        myGestureRecogitionThread.start();
    }

    public void stop() {
        captureTimer.cancel();
        myVideoCaptureThread.interrupt();
        myHandDetectionThread.interrupt();
        mySkinDisplayThread.interrupt();
        myFrameDisplayThread.interrupt();
        myGestureRecogitionThread.interrupt();

        mInputFrames.removeAll(mInputFrames);
        mProcessedFrames.removeAll(mProcessedFrames);

    }

    ////////////////////////
    // Lock class
    ////////////////////////
    private static final class Lock {}

    ////////////////////////
    // Thread clases
    ////////////////////////
    private class MyVideoCaptureThread extends Thread {
        volatile LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(THREAD_MESSAGES_LIMIT);
        WebCamVideoCapture vc = new WebCamVideoCapture();
        Lock lock = new Lock();

        @Override
        public void run() {
            synchronized (lock) {
                while (!isInterrupted()) {
                    if (threadMessages.poll() != ThreadMessages.GET_NEW_FRAME) {
                        try {
                            lock.wait(WAITING_TIMEOUT);
                            System.out.println("WideoCaptureThread waiting");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    Mat mat = vc.getNextMatFrame();
                    System.out.println("getting next frame from webcam");
                    if (mat != null && mInputFrames != null) {
                        mInputFrames.offerFirst(new InputFrame(vc.getNextMatFrame()));

                        if (myHandDetectionThread != null && myHandDetectionThread.threadMessages != null) {
                            myHandDetectionThread.wakeUp();
                            myHandDetectionThread.threadMessages.offer(ThreadMessages.PROCESS_INPUT_FRAME);
                        }

                        if (myFrameDisplayThread != null && myFrameDisplayThread.threadMessages != null) {
                            myFrameDisplayThread.wakeUp();
                            myFrameDisplayThread.threadMessages.offer(ThreadMessages.NEW_INPUT_FRAME);
                        }
                    }
                }
            }
        }

        public void wakeUp() {
            synchronized (lock) {
                lock.notifyAll();
                System.out.println("Waking up WideoCapture");
            }
        }

        @Override
        public void interrupt() {
            vc.close();
            super.interrupt();
        }
    }

    private class MyFrameDisplayThread extends Thread {
        volatile LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(THREAD_MESSAGES_LIMIT);
        Lock lock = new Lock();

        ImageView mCamImageView;

        public MyFrameDisplayThread(ImageView mImageView) {
            this.mCamImageView = mImageView;
        }

        private void updateImageViews() {
            if (shouldUpdateCamImageView() && mInputFrames != null && !mInputFrames.isEmpty() && mInputFrames.peek() != null && mInputFrames.peek().getFrame() != null) {
                System.out.println("Updating frame image view");
                mCamImageView.setImage(Utils.cvMatToImage(mInputFrames.peekFirst().getFrame()));
                threadMessages.poll();
            }
        }

        @Override
        public void run() {
            synchronized (lock) {
                while (!isInterrupted()) {
                    if (threadMessages.peek() != ThreadMessages.NEW_INPUT_FRAME) {
                        try {
                            lock.wait(WAITING_TIMEOUT);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    updateImageViews();
                }
            }
        }

        public void wakeUp() {
            synchronized (lock) {
                lock.notifyAll();
                System.out.println("Waking up FrameDisplay");
            }
        }

        private boolean shouldUpdateCamImageView() {
            if (!Config.CAPTURE_PREVIEW_MODE) return false;
            return true;
        }
    }

    private class MySkinDisplayThread extends Thread {
        volatile LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(THREAD_MESSAGES_LIMIT);
        ImageView mHandImageView;
        Object lock = new Lock();

        public MySkinDisplayThread(ImageView mHandImageView) {
            this.mHandImageView = mHandImageView;
        }

        private synchronized void updateHandImageView() {
            if (shouldUpdateHandImageView() && mProcessedFrames != null && !mProcessedFrames.isEmpty() && mProcessedFrames.peek() != null && mProcessedFrames.peek().getmHandMask() != null) {
                System.out.println("Updating skin image view");
                mHandImageView.setImage(Utils.cvMatToImage(mProcessedFrames.peekFirst().getmHandMask()));
                threadMessages.poll();
            }
        }

        @Override
        public void run() {
            synchronized (lock) {
                while (!isInterrupted()) {
                    if (threadMessages.peek() != ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED) {
                        try {
                            lock.wait(WAITING_TIMEOUT);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    updateHandImageView();
                }
            }
        }

        private boolean shouldUpdateHandImageView() {
            if (!Config.SKIN_MASK_PREVIEW_MODE) return false;
            return true;
//            long now = System.currentTimeMillis();
//            boolean should = now - lastUpdatedHandImageViewMillis > TIMER_INTERVAL;
//            lastUpdatedHandImageViewMillis = now;
//            return should;
        }

        public void wakeUp() {
            synchronized (lock) {
                lock.notifyAll();
                System.out.println("Waking up FrameDisplay");
            }
        }
    }

    private class MyHandDetectionThread extends Thread {
        volatile LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(THREAD_MESSAGES_LIMIT); //TODO if multiple threads, define it out of class
        HandDetector hd = new HandDetector();
        Object lock = new Lock();

        @Override
        public void run() {
            synchronized (lock) {
                while (!isInterrupted()) {
                    if (threadMessages.poll() != ThreadMessages.PROCESS_INPUT_FRAME) {
                        try {
                            lock.wait(WAITING_TIMEOUT);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    if (mInputFrames != null /*&& mInputFrames.size() > 0 && mInputFrames.peek() != null && !mInputFrames.peek().getIsProcessed()*/) {
                        System.out.println("Detecting hand...");
//                    Mat handMask = hd.detectHand(mInputFrames.peek());
//                    int[][] fingerCoordinates = new int[5][2];
//                    int[] convDefects = new int[5];
//                    int[] handCenterCoordinates = new int[2];
                        mProcessedFrames.offerFirst(new ProcessedFrame(hd.detectHand(mInputFrames.peek()), null, null, null));
                        if (myGestureRecogitionThread != null && myGestureRecogitionThread.threadMessages != null) {
                            myGestureRecogitionThread.threadMessages.offer(ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED);
                            mySkinDisplayThread.wakeUp();
                        }

                        if (mySkinDisplayThread != null && mySkinDisplayThread.threadMessages != null) {
                            mySkinDisplayThread.threadMessages.offer(ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED);
                            mySkinDisplayThread.wakeUp();
                        }
                    }
                }
            }
        }

        public void wakeUp() {
            synchronized (lock) {
                lock.notifyAll();
                System.out.println("Waking up hand Detection");
            }
        }
    }

    private class MyGestureRecogitionThread extends Thread {
        volatile LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(THREAD_MESSAGES_LIMIT);
        GestureRecognizer r = new GestureRecognizer();
        Lock lock = new Lock();

        @Override
        public void run() {
            synchronized (lock) {
                while (!isInterrupted()) {
                    if (threadMessages.poll() != ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED) {
                        try {
                            lock.wait(WAITING_TIMEOUT);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    } else {
                        r.lookForGestures(mProcessedFrames);
                    }
                }
            }
        }

        public void wakeUp() {
            synchronized (lock) {
                lock.notifyAll();
                System.out.println("Waking up hand Detection");
            }
        }
    }
} 

Upvotes: 0

Views: 147

Answers (1)

Mapsy
Mapsy

Reputation: 4272

Both threads seem to use polling in their run() method; i.e. they continuously loop around a statement checking a boolean condition. This can be bad for CPU usage, because a single thread can lock up the CPU without giving any cycles to other threads; it can end up hogging the CPU, even though it's not doing anything too useful; just failing some boolean condition.

You should use an asynchronous method for communicating with Threads; rather than using a polling mechanism, you should put threads to sleep when they are not required to do any processing, and wake them up once they're needed. This allows threads to yield the CPU, meaning that they willingly relinquish their active context so that other threads can execute.

Upvotes: 2

Related Questions