eric
eric

Reputation: 2759

Java Fork Join Pool getting stuck

I have a data migration service that reads stuff from one database in chunks then migrates it into another database.

To work through the 'Chunks' of data- I am trying to use a RecursiveAction and fork join pool. There reason for this is I would like to work to execute the work on these "chunks" in parallel, then get another chunk, then execute, then get another chunk, etc.

What's happening is my process just stops. I see no exceptions in the logs, and I see no deadlocked threads. My code is below, my questions are:

ForkJoinPool-1-worker-18 id=12191 state=WAITING - waiting on <0x1b5ca93e> (a java.util.concurrent.ForkJoinPool) - locked <0x1b5ca93e> (a java.util.concurrent.ForkJoinPool) at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.ForkJoinPool.tryAwaitWork(ForkJoinPool.java:864) at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:647) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398)

Code:

@Component
public class BulkMigrationService {

final ForkJoinPool pool = new ForkJoinPool();

private static final Logger log = LoggerFactory.getLogger(BulkMigrationService.class);

private SourceDataApi api;
private final Migrator migrator;
private MetadataService metadataService;

@Autowired
public BulkMigrationService(SourceDataApi api, Migrator migrator, MetadataService metadataService) {
    this.api = api;
    this.migrator = migrator;
    this.metadataService = metadataService;
}

public void migrate(Integer batchSize, Long max) throws MigrationException {

    Long currentCount = 0l;
    Integer currentIndex = 0;

    while (currentCount < max) {
        List<String> itemsToMigrate = api.findItemRange(currentIndex, currentIndex + batchSize);

        if (assetsToMigrate.size() > 0) {
            MigrateForkedWorker starter = new MigrateForkedWorker(assetsToMigrate);
            pool.invoke(starter);
        }

        currentCount += assetsToMigrate.size();
        currentIndex += batchSize - 1;
        if (log.isDebugEnabled()) {
            log.debug("Migrated " + currentCount + " Items.");
        }
    }

}

public class MigrateForkedWorker extends RecursiveAction {

    private int max = 10;
    private List<String> allItems;

    public MigrateForkedWorker(List<String> allItems) {
        this.allItems = allItems;

    }

    @Override
    protected void compute() {
        if (allItems.size() <= max) {
            for (String itemInfo : allItems) {

                try {
                    migrator.migrateAsset(itemInfo);

                }
                catch (MigrationException e) {
                    e.printStackTrace();
                }
            }
        }
        else {
            int targetSize = allItems.size() % 2 == 0 ? allItems.size() / 2 : (allItems.size() + 1) / 2;
            List<List<String>> splits = Lists.partition(allItems, targetSize);
            MigrateForkedWorker migrateForkedWorkerOne = new MigrateForkedWorker(splits.get(0));
            MigrateForkedWorker migrateForkedWorkerTwo = new MigrateForkedWorker(splits.get(1));

            invokeAll(migrateForkedWorkerOne, migrateForkedWorkerTwo);
        }
    }
}
 }

Upvotes: 2

Views: 2327

Answers (1)

edharned
edharned

Reputation: 1904

Your first problem is that you are using invokeAll(). That simply submits new requests to the pool and waits for them to complete. Follow the example in the JavaDoc: use fork() migrateForkedWorkerOne.fork(); migrateForkedWorkerTwo.compute(); migrateForkedWorkerOne.join();

Upvotes: 1

Related Questions