Reputation: 827
Consider the following scenario:
We want to take a large distributed collection of objects, and for each object in the collection we want to kick of another computation which uses the current object and another large distributed collection to compute a result that transforms the current object.
E.g.
collection A: 1,2,3,4,5,6,7,8......
collection B: 1,2,3,4,5,6,7,8......
For each value in A, we iterate all the values in B, multiplying each by 2 and summing these values, we map each value in A to this sum multiplied by the current A value.
Below is my attempt which results in a deadlock when the following is used:
c2.newJob(p2).join()
there is no deadlock when the following is used:
c2.newJob(p2)
, however we want p2 to complete to ensure we get the correct sum.
This might seem like a non-idiomatic way of using Jet for this specific use case however I want to use this pattern to solve other problems and so I would greatly appreciate your help with this.
JetInstance jet = Jet.newJetInstance();
JetInstance c1 = Jet.newJetClient();
Pipeline p1 = Pipeline.create();
List<Integer> aIn = jet.getList("a-in");
aIn.add(1);
aIn.add(2);
aIn.add(3);
p1.drawFrom(Sources.list("a-in"))
.map(e -> {
Pipeline p2 = Pipeline.create();
JetInstance c2 = Jet.newJetClient();
List<Integer> bIn = c2.getList("b-in");
bIn.add(1);
bIn.add(2);
bIn.add(3);
p2.drawFrom(Sources.list("b-in"))
.map(i->((Integer)i)*2)
.drainTo(Sinks.list("b-out"));
List<Integer> bOut = c2.getList("b-out");
// I would have thought it should just wait for the computation to complete,
// instead the join here causes jet to block itself,
c2.newJob(p2).join();
int sum = 0;
for (Integer i : bOut){
sum+=i;
}
return ((Integer)e)*sum;
}).drainTo(Sinks.list("a-out"));
c1.newJob(p1).join();
Upvotes: 0
Views: 277
Reputation: 10812
There are multiple problems in your code:
the map
function should not block. In the upcoming version we're adding mapUsingContextAsync
where you can use the client connection as a context, submit the job and return job.getFuture()
.
the map
operations will run in parallel. You need to ensure they don't share the temporary list. In your samples, all subjobs use b-out
and they overwrite each other's data.
The cause for the deadlock was this: the join()
in map()
blocked the cooperative worker and was waiting for the sub-job to complete, but the sub-job cannot complete due to the blocked cooperative worker thread.
Further, Jet is not optimized for very small batch jobs, but I guess your actual job is larger. There's quite some overhead to deploy the job; if the job itself only runs for few ms the overhead is substantial. In this specific case you'd be better to just use list.stream().map(i->i*2).sum()
instead of a sub-job.
JetInstance jet = Jet.newJetInstance();
JetInstance c1 = Jet.newJetClient();
Pipeline p1 = Pipeline.create();
List<Integer> aIn = jet.getList("a-in");
aIn.add(1);
aIn.add(2);
aIn.add(3);
List<Integer> bIn = jet.getList("b-in");
bIn.add(1);
bIn.add(2);
bIn.add(3);
p1.drawFrom(Sources.list("a-in"))
.mapUsingContextAsync(
ContextFactory
.withCreateFn(inst -> tuple2(inst, inst.<UUID, Long>getMap("tmpResults")))
// mark as non-cooperative, job submission does some blocking
.toNonCooperative()
.withLocalSharing()
.withMaxPendingCallsPerProcessor(2)
.withDestroyFn(ctx -> ctx.f1().destroy()),
(ctx, item) -> {
Pipeline p2 = Pipeline.create();
JetInstance instance = ctx.f0();
UUID key = UUID.randomUUID();
IMapJet<UUID, Long> tmpResultsMap = ctx.f1();
p2.drawFrom(Sources.list("b-in"))
.map(i -> ((Integer) i) * 2L)
.aggregate(summingLong(Long::longValue))
.map(sum -> entry(key, sum))
.drainTo(Sinks.map(tmpResultsMap));
return instance.newJob(p2).getFuture()
.thenApply(r -> entry(item, tmpResultsMap.remove(key)));
})
.drainTo(Sinks.list("a-out"));
c1.newJob(p1).join();
jet.getList("a-out").forEach(System.out::println);
This prints the following output:
1=12
2=12
3=12
The above code works in current snapshot and should work in Jet 3.0 which is due in weeks.
Upvotes: 2
Reputation: 3257
@newlogic, try this approach:
b-in
& write to b-out
map, not list. You can use a known key or just use timestamp etc as key & define a TTL on that table to remove old results.b-out
table (local listener so only the node who holds the updated key will be notified) to listen entryAdded/Updated events, depends on what you chose in the first step & submit a new job from that listener method for processing a-in
.This way, you don't need to wait, once first job is completed, it'll trigger the second job automatically.
Upvotes: 1