newlogic
newlogic

Reputation: 827

Is it possible to nest Hazelcast Jet pipelines such that an inner pipeline can compute results for an outer pipeline?

Consider the following scenario:

We want to take a large distributed collection of objects, and for each object in the collection we want to kick of another computation which uses the current object and another large distributed collection to compute a result that transforms the current object.

E.g.

collection A: 1,2,3,4,5,6,7,8......

collection B: 1,2,3,4,5,6,7,8......

For each value in A, we iterate all the values in B, multiplying each by 2 and summing these values, we map each value in A to this sum multiplied by the current A value.

Below is my attempt which results in a deadlock when the following is used:

c2.newJob(p2).join()

there is no deadlock when the following is used:

c2.newJob(p2)

, however we want p2 to complete to ensure we get the correct sum.

This might seem like a non-idiomatic way of using Jet for this specific use case however I want to use this pattern to solve other problems and so I would greatly appreciate your help with this.

JetInstance jet =  Jet.newJetInstance();
JetInstance c1 =  Jet.newJetClient();

Pipeline p1 = Pipeline.create();

List<Integer> aIn =  jet.getList("a-in");
aIn.add(1);
aIn.add(2);
aIn.add(3);

p1.drawFrom(Sources.list("a-in"))
        .map(e -> {
          Pipeline p2 = Pipeline.create();
          JetInstance c2 =  Jet.newJetClient();

          List<Integer> bIn = c2.getList("b-in");
          bIn.add(1);
          bIn.add(2);
          bIn.add(3);

          p2.drawFrom(Sources.list("b-in"))
                  .map(i->((Integer)i)*2)
                  .drainTo(Sinks.list("b-out"));

          List<Integer> bOut = c2.getList("b-out");

          // I would have thought it should just wait for the computation to complete,
          // instead the join here causes jet to block itself,
          c2.newJob(p2).join();

          int sum = 0;
          for (Integer i : bOut){
            sum+=i;
          }

          return ((Integer)e)*sum;
        }).drainTo(Sinks.list("a-out"));
c1.newJob(p1).join();

Upvotes: 0

Views: 277

Answers (2)

Oliv
Oliv

Reputation: 10812

There are multiple problems in your code:

  1. the map function should not block. In the upcoming version we're adding mapUsingContextAsync where you can use the client connection as a context, submit the job and return job.getFuture().

  2. the map operations will run in parallel. You need to ensure they don't share the temporary list. In your samples, all subjobs use b-out and they overwrite each other's data.

  3. The cause for the deadlock was this: the join() in map() blocked the cooperative worker and was waiting for the sub-job to complete, but the sub-job cannot complete due to the blocked cooperative worker thread.

Further, Jet is not optimized for very small batch jobs, but I guess your actual job is larger. There's quite some overhead to deploy the job; if the job itself only runs for few ms the overhead is substantial. In this specific case you'd be better to just use list.stream().map(i->i*2).sum() instead of a sub-job.

JetInstance jet = Jet.newJetInstance();
JetInstance c1 = Jet.newJetClient();

Pipeline p1 = Pipeline.create();

List<Integer> aIn = jet.getList("a-in");
aIn.add(1);
aIn.add(2);
aIn.add(3);

List<Integer> bIn = jet.getList("b-in");
bIn.add(1);
bIn.add(2);
bIn.add(3);

p1.drawFrom(Sources.list("a-in"))
  .mapUsingContextAsync(
          ContextFactory
                  .withCreateFn(inst -> tuple2(inst, inst.<UUID, Long>getMap("tmpResults")))
                  // mark as non-cooperative, job submission does some blocking
                  .toNonCooperative()
                  .withLocalSharing()
                  .withMaxPendingCallsPerProcessor(2)
                  .withDestroyFn(ctx -> ctx.f1().destroy()),
          (ctx, item) -> {
              Pipeline p2 = Pipeline.create();
              JetInstance instance = ctx.f0();
              UUID key = UUID.randomUUID();
              IMapJet<UUID, Long> tmpResultsMap = ctx.f1();

              p2.drawFrom(Sources.list("b-in"))
                .map(i -> ((Integer) i) * 2L)
                .aggregate(summingLong(Long::longValue))
                .map(sum -> entry(key, sum))
                .drainTo(Sinks.map(tmpResultsMap));

              return instance.newJob(p2).getFuture()
                             .thenApply(r -> entry(item, tmpResultsMap.remove(key)));
          })
  .drainTo(Sinks.list("a-out"));

c1.newJob(p1).join();
jet.getList("a-out").forEach(System.out::println);

This prints the following output:

1=12
2=12
3=12

The above code works in current snapshot and should work in Jet 3.0 which is due in weeks.

Upvotes: 2

Gokhan Oner
Gokhan Oner

Reputation: 3257

@newlogic, try this approach:

  1. Create a job that read from b-in & write to b-out map, not list. You can use a known key or just use timestamp etc as key & define a TTL on that table to remove old results.
  2. Create a listener on b-out table (local listener so only the node who holds the updated key will be notified) to listen entryAdded/Updated events, depends on what you chose in the first step & submit a new job from that listener method for processing a-in.

This way, you don't need to wait, once first job is completed, it'll trigger the second job automatically.

Upvotes: 1

Related Questions