St.Antario
St.Antario

Reputation: 27385

Ignite put data into a cache asynchronosuly

Im working on a heavy loaded application and would like to have the ability to put data into a cache asynchronously. I mean something like invoke the operation on a cache and receive IgniteFuture<V> representing the result.

I mean I need the ability to paralelize data extraction from a persistence storage and putting them into a cache. If something happen I can try to extract the data once more (fine not too much of data, quite fine grained).

Upvotes: 0

Views: 275

Answers (1)

hagrawal7777
hagrawal7777

Reputation: 14658

If you do not have any hard requirement on IgniteFuture (which I believe should not be the case) and all you want is some mechanism to put data into a cache and get it processed asynchronously, and then process the results returned by that operation, then you can use Java's Executor Service.

If you are not much aware of Java's executor service then you may want to read documentation or this answer which highlights quick points, in below example I have also added comments.

Below are few other quick points about number of threads:

  • This example creates a ExecutorService with "ThreadPoolExecutor" implementation, there are other options like "Executors.newSingleThreadExecutor()" and "Executors.newFixedThreadPool(10)" which lets you define how many threads you want in the JVM.
  • You can also choose to directly create object of ThreadPoolExecutor like this return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); and you can have more control on how many threads and type of queue. You will need to read more on this if you want to create ThreadPoolExecutor object yourself and not through ExecutorService

Few other points related to your implementations:

  • When you are processing the Future objects then it is blocking process, because Future.get() is a blocking call, so your main thread will be blocked until all Future objects are returned and processed.
  • If you do not want blocking then there are several options like you can create a new thread to do all this processing and hence freeing your main thread. Another option could be that do not processing Future objects, so as soon you do executorService.submit(callableTask1), you thread will be free, and then in your CallableTask object you can push the result in a queue (you can choose Java's queue implementation as per your need) and then process that queue from other thread. Yet another option could be to dedicate another thread for processing your Future objects.

Sample code:

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorServiceFutureCallableExample {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        List<Future<String>> futuresList = new ArrayList<>();

        ExecutorService executorService = Executors.newCachedThreadPool();
        ExecutorServiceFutureCallableExample.CallableTask callableTask1 = new ExecutorServiceFutureCallableExample.CallableTask(2000);
        ExecutorServiceFutureCallableExample.CallableTask callableTask2 = new ExecutorServiceFutureCallableExample.CallableTask(1000);
        ExecutorServiceFutureCallableExample.CallableTask callableTask3 = new ExecutorServiceFutureCallableExample.CallableTask(3000);

        System.out.println("### Starting submitting tasks");

        // submit the callable and register the returned future object so that it can be processed later.
        futuresList.add(executorService.submit(callableTask1));
        futuresList.add(executorService.submit(callableTask2));
        futuresList.add(executorService.submit(callableTask3));

        System.out.println("### Finished submitting tasks");

        for (int i = 0; i < futuresList.size(); i++) {
            // here "get()" waits for the future tasks to be returned.
            System.out.println(futuresList.get(i).get());
        }

        System.out.println("### Finished.");
    }

    static class CallableTask implements Callable<String>{

        private long timeToSleep;

        CallableTask(long _timeToSleep){
            this.timeToSleep = _timeToSleep;
        }

        @Override
        public String call() throws Exception {
            String str = new Date() + ": Processing - " + this.hashCode() + " | " + Thread.currentThread() + ", slept for seconds - " + timeToSleep; 
            System.out.println(str);
            Thread.sleep(timeToSleep);
            return str + " ||||| completed at: " + new Date();
        }

        public long getTimeToSleep() {
            return timeToSleep;
        }

        public void setTimeToSleep(long timeToSleep) {
            this.timeToSleep = timeToSleep;
        }

    }
}

Upvotes: 1

Related Questions