Francesco Sgaramella
Francesco Sgaramella

Reputation: 1017

scheduling periodical threads with priority

I have to implement a kind of multithreading application in java 8.

My application should run some Threads periodically, let's say once every 30 minutes. Each Thread should call an api, get the data from it and save into the database. The database is organized in this way:

Table A
Table B
Table C
Table D
Table E
Table F
Table G
Table H

Tables F, G and H have foreign keys to Tables A, B, C, D and E.

This lead to have a Thread for each table to update:

Thread A -> Table A
Thread B -> Table B
Thread C -> Table C
Thread D -> Table D
Thread E -> Table E
Thread F -> Table F
Thread G -> Table G
Thread H -> Table H

Since some tables have foreign keys to other tables, the Threads should not start all at the same time, but first should start Thread A, B, C and E and once they have finished, then should start Thread F, G and H.

I have searched a lot on the internet for a possible solution, but I haven't found the right one for my situation.

For the moment I have implemented my code with ScheduledThreadPoolExecutor that has the function scheduleWithFixedDelay that ensures me that the Threads are executed once every 30 minutes. Even if I schedule every thread with different priority, they all start at the same time and I have exception while executing insert/update in the tables with foreign keys.

Here is a snapshot of my code that might help to understand.

The Thread-like classes look like this:

public class AllarmReasonService implements Runnable {

    @Override
    public void run() {
        AllarmReasonDAO dao = new AllarmReasonDAO();
        PagedResultContainer<AllarmReason> allarmReasonContainer = AllarmReasonApi.getInstance().getAllarmReasons();
        for(Iterator<AllarmReason> iterator = allarmReasonContainer.getData().iterator(); iterator.hasNext();){
            AllarmReason allarmReason = iterator.next();
            dao.insertOrUpdateAllarmReason(allarmReason);
        }
    }
}

The threadExecutor:

public void threadExecutor(){
    initializeThreadMap();
    Stream<Entry<String, Integer>> sorted = threadTimerMap.entrySet().stream().sorted(Entry.comparingByValue());
    scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(threadTimerMap.size());

    for(Iterator<Entry<String, Integer>> entryIterator = sorted.iterator(); entryIterator.hasNext();){
        Entry<String, Integer> entry = entryIterator.next();
        Runnable service = threadMap.get(entry.getKey());
        Thread thread = new Thread(service);
        thread.setPriority(entry.getValue());
        scheduler.scheduleWithFixedDelay(thread, 0, 30 * 60 * 1000, TimeUnit.MILLISECONDS);
    }
}

private void initializeThreadMap(){
        threadMap.clear();
        threadMap.put("causaliAllarme", new AllarmReasonService());
        threadMap.put("causaliLavorazione", new ManufactureReasonService());
        threadMap.put("celle", new CellService());
        threadMap.put("dipendenti", new EmployeeService());
        threadMap.put("ordiniLavorazione", new ManufactureOrderService());
        threadMap.put("parzialiLavorazione", new ManufacturePartialService());
        threadMap.put("qualita", new QualityService());
        threadMap.put("qualitaRilevata", new QualityDetectionService());
    }

And, finally, the threadTimerMap has key = name of the module/thread (i.e. causaliAllarme) and value = priority where priority is higher for modules to update first.

With this setting, I am not able to run first the most important Threads even if I set a certain priority.

Consider that I am not a pro with Thread and multiThread.

Any help would be apreciated. Thanks in advance.

Upvotes: 0

Views: 205

Answers (2)

OldCurmudgeon
OldCurmudgeon

Reputation: 65811

You could join the two functions together using a CountDownLatch.

This code just illustrates the idea - there may be tidier ways to achieve this.

class SignalledRunnable implements Runnable {
    final CountDownLatch wait;
    final CountDownLatch signal;
    final Runnable it;

    public SignalledRunnable(CountDownLatch waitFor, CountDownLatch signal, Runnable it) {
        this.wait = waitFor;
        this.signal = signal;
        this.it = it;
    }

    @Override
    public void run() {
        if (wait != null) {
            try {
                // Wait for pre-condition.
                wait.await();
            } catch (InterruptedException e) {
                // Please remember to do something here.
            }
        }
        // Do it.
        it.run();
        if (signal != null) {
            // Signal post-condition.
            signal.countDown();
        }

    }

class A implements Runnable {
    @Override
    public void run() {
        // Update table A
    }
}

class F implements Runnable {

    @Override
    public void run() {
        // Update table F.
    }
}

public void test() {
    // Make F wait for A.
    A a = new A();
    F f = new F();
    CountDownLatch afSequencer = new CountDownLatch(1);
    // The A must signal at completion.
    Runnable ar = new SignalledRunnable(null, afSequencer, a);
    // The F should wait for that signal.
    Runnable fr = new SignalledRunnable(afSequencer, null, f);
    // You can now add ar and fr to your task queue and f will block until a completes.

}

NB: As @RealSkeptic points out, remember that the f thread will block until a completes so be sure your thread pool is large enough to handle as many blocked threads as can happen in your system.

Upvotes: 2

Sergey Prokofiev
Sergey Prokofiev

Reputation: 1885

For such situations I'll advice to use CompletableFuture features.

// Declare your pool. Declare ThreadFactory also, it will be very
// helpful with debug, I promise :) 
int corePoolSize = 10;
ScheduledExecutorService pool = Executors.newScheduledThreadPool(corePoolSize,
      new ThreadFactoryBuilder().setNameFormat("Your-thread-%d").setDaemon(true).build());

List<CompletableFuture<Void>> dependencies = new ArrayList<>();

// Submit the threads for the first stage
dependencies.add(CompletableFuture.runAsync(new AllarmReasonService(), pool));
dependencies.add(CompletableFuture.runAsync(new ManufactureReasonService(), pool));
// ...
// do the same with all your stage-1 threads

// wait while stage 1 completed
try {
    for (CompletableFuture<Void> f : dependencies) {
        f.get();
    }
} catch (InterruptedException | ExecutionException e) {
    // log or re-throw
    pool.shutdownNow();
}

// stage 2
CompletableFuture.runAsync(new AllarmReasonService(), pool);
CompletableFuture.runAsync(new ManufactureReasonService(), pool);
// other required ...

Also you can aggregate futures with CompletableFuture.allOf(CompletableFuture<?>...) method and wait for single future.

Hope it helps!

Upvotes: 2

Related Questions