Milind
Milind

Reputation: 61

Sink to java list possible with Hazelcast Jet?

I have a list of accounts and perform a hashjoin on ticks and return the accounts with ticks data. But after hashjoin I have drainTo lListJet and then read it with DistributedStream and return it.

public List<Account> populateTicksInAccounts(List<Account> accounts) {
    ...
    ...
    Pipeline p = Pipeline.create();
    BatchSource<Tick> ticksSource = Sources.list(TICKS_LIST_NAME);
    BatchSource<Account> accountSource = Sources.fromProcessor(AccountProcessor.of(accounts));

    p.drawFrom(ticksSource)
        .hashJoin(p.drawFrom(accountSource), JoinClause.joinMapEntries(Tick::getTicker), accountMapper())
        .drainTo(Sinks.list(TEMP_LIST));

    jet.newJob(p).join();
    IListJet<Account> list = jet.getList(TEMP_LIST);
    return DistributedStream.fromList(list).collect(DistributedCollectors.toIList());
}

Is it possible to drainTo to java List instead of lListJet after performing a hashjoin?

Something like below is possible?

IListJet<Account> accountWithTicks = new ArrayList<>();
p.drawFrom(ticksSource)
    .hashJoin(p.drawFrom(accountSource), JoinClause.joinMapEntries(Tick::getTicker), accountMapper())
    .drainTo(<CustomSinkProcessor(accountWithTicks)>);
return accountWithTicks;

where in CustomSinkProcessor will take empty java list and return with the accounts?

Upvotes: 1

Views: 234

Answers (2)

Marko Topolnik
Marko Topolnik

Reputation: 200148

Keep in mind that the code you submit to Jet for execution runs outside the process where you submit it from. While it would be theoretically possible to provide the API you're asking for, under the hood it would just have to perform some tricks to run the code on each member of the cluster, let all members send their results to one place, and fill up a list to return to you. It would go against the nature of distributed computing.

If you think it will help the readability of your code, you can write a helper method such as this:

public <T, R> List<R> drainToList(GeneralStage<T> stage) {
    String tmpListName = randomListName();
    SinkStage sinkStage = stage.drainTo(Sinks.list(tmpListName));
    IListJet<R> tmpList = jet.getList(tmpListName);
    try {
        jet.newJob(sinkStage.getPipeline()).join();
        return new ArrayList<>(tmpList);
    } finally {
        tmpList.destroy();
    }
}

Especially note the line

return new ArrayList<>(tmpList);

as opposed to your

IListJet<Account> list = jet.getList(TEMP_LIST);
return DistributedStream.fromList(list).collect(DistributedCollectors.toIList());

This just copies one Hazelcast list to another one and returns a handle to it. Now you have leaked two lists in the Jet cluster. They don't automatically disappear when you stop using them.

Even the code I provided can still be leaky. The JVM process that runs it can die during Job.join() without reaching finally. Then the temporary list lingers on.

Upvotes: 2

Oliv
Oliv

Reputation: 10812

No, it's not, due to the distributed nature of Jet. The sink will execute in multiple parallel processors (workers). It can't add to plain Collection. The sink has to be able to insert items on multiple cluster members.

Upvotes: 0

Related Questions