dashenswen
dashenswen

Reputation: 600

Concurrent polling downstream dependencies and wait until all of them succeed

I am trying to write a simple function that long-polls multiple messages tothe downstream dependency without exhausting it and only exist when all messages succeeded.

I came up with a way to wrap each message polling into a callable and use a ExecutorService to submit a list of callables.


    public void poll(final List<Long> messageIdList) {
        ExecutorService executorService = Executors.newFixedThreadPool(messageIdList.size());
        List<MessageStatusCallable> callables = messageIdList.stream()
                .map(messageId -> new MessageStatusCallable(messageId)).collect(Collectors.toList());
        boolean allSuccess = false;
        try {
            allSuccess = executorService.invokeAll(callables).stream().allMatch(success -> {
                try {
                    return success.get().equals(Boolean.TRUE);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    return false;
                } catch (ExecutionException e) {
                    e.printStackTrace();
                    return false;
                }
            });
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    private class MessageStatusCallable implements Callable<Boolean> {
        private Long messageId;
        public MessageStatusCallable(Long messageId) {
            this.messageId = messageId;
        }

        /**
         * Computes a result, or throws an exception if unable to do so.
         *
         * @return computed result
         * @throws Exception if unable to compute a result
         */
        @Override
        public Boolean call() throws Exception {
            String messageStatus = downstreamService.getMessageStatus(messageId);
            while(messageStatus == null || !messageStatus.equals( STATUS_VALUE_SUCCEEDED) {
                messageStatus = messageLogToControlServer.getMessageStatus(messageId);
                Thread.sleep(TimeUnit.MICROSECONDS.toMillis(100));
            }
            LOG.info("Message: " + messageId + " Succeded");
            return true;
        }
    }

I wonder if there is a better way to achieve this since Thread.sleep is blocking and ugly.

Upvotes: 0

Views: 112

Answers (1)

Marc G. Smith
Marc G. Smith

Reputation: 886

I'm not sure this is the best solution but it occurred to me you could use a CountDownLatch and ScheduledExecutorService.

    public void poll(final List<Long> messageIdList) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(messageIdList.size());
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(POOL_SIZE);
        try {
            for (Long messageId : messageIdList) {
                MessageStatusCallable callable = new MessageStatusCallable(messageId, latch);
                executorService.scheduleWithFixedDelay(
                        () -> {
                            String messageStatus = downstreamService.getMessageStatus(messageId);
                            if (STATUS_VALUE_SUCCEEDED.equals(messageStatus)) {
                                latch.countDown();
                                throw new CompletionException("Success - killing the task", null);
                            }
                        }, 
                        0, 100, TimeUnit.MILLISECONDS);
            }

            latch.await();

        } finally {
            executorService.shutdown();
        }
    }

I probably also wouldn't have the Runnable as a lambda other than for brevity in the answer.

Upvotes: 1

Related Questions