coryfoo
coryfoo

Reputation: 191

Dataflow Batch Job Stuck in GroupByKey.create()

I have a batch dataflow pipeline that I have ran without issue many times on a subset of our data, with approximately 150k rows of input. I have now attempted to run on our full dataset of approximately 300M rows. A crucial part of the pipeline performs a GroupByKey of the input records resulting in (I believe) ~100M keys.

The relevant part of the pipeline looks like this:

// Results in map of ~10k elements, 160MB in size
PCollectionView<Map<String, Iterable<Z>>> sideData = ...

...

.apply(ParDo.named("Group into KV").of(
    new DoFn<T, KV<String, T>>() { ... }
))
.apply("GBK", GroupByKey.create())
.apply(ParDo.named("Merge Values").withSideInputs(sideData).of(
    new DoFn<KV<String, Iterable<T>>, V>() { ... }
))

Two times I have run this pipeline and each time the job has stalled after running fine for more than 16 hours. The first time I ran it using 10 n1-highmem-8 and the second time using 6 n1-highmem-16 instances.

I can tell from the Dataflow job console that the Group into KV ParDo completes just fine and outputs 101,730,100 elements with a size of 153.67 GB. The step detail for the GBK transform says that there were 72,091,846 and 72,495,353 elements added in the first and second attempts, respectively. At this point the GBK transform is still in the running phase, but the CPU on all the machines drops to zero and the pipeline is effectively stalled. All future stages in the pipeline all stop incrementing element counts. I can ssh into the machines to look at the various logs under /var/log/dataflow/ but there doesn't appear to be anything out of the ordinary. No errors in the cloud console and the GC logs don't seem to indicate memory issues.

At this point I'm at a bit of a loss to know what to do next. I have read that using a Combiner instead of using the GroupByKey could yield better scalability. And with a little refactoring I could make it so that the code is commutative so that the Combiner would be an option. I'm somewhat hesitant to attempt that as each time I have tried to run this pipeline it has cost me ~$250 in wasted cloud compute time.

My questions are:

IDS of aforementioned jobs:

Upvotes: 2

Views: 2051

Answers (1)

Ben Chambers
Ben Chambers

Reputation: 6130

It looks like one worker may be either stuck or taking a long time to run the DoFn code after the GroupByKey. The most likely cause of this is a "hot key" (having significantly more values than other keys). You could add an Aggregator to the DoFn and report the size of the Iterable while running, something like this:

private static class MyDoFn extends KV<String, Iterable<T>>, V> {

  private static final Logger LOG =
    LoggerFactory.getLogger(FilterTextFn.class);
  private final Aggregator<Long, Long> maxIterableSize =
      createAggregator("maxIterableSize", new Max.MaxLongFn());

  @Override
  public void processElement(ProcessContext c) {
    long numElements = 0;
    for (T value : c.element().getValue()) {
      maxIterableSize.addValue(numElements++);
      if (numElements == 100) {
        LOG.warning("Key {} has > 100 values", c.element().getKey());
      }
      ... // the rest of your code inside the loop
    }
  }
}

The above will add a counter showing the maximum number of elements on a single key, and also report to Cloud Logging any key that has more than 100 values (feel free to adjust the threshold as seems reasonable -- the single hot key likely has many more elements than any other key).

The other possibility is that there is something in the code for that DoFn that is either hanging or really slow on some specific set of data. You could try connecting to the worker that is processing this one item and seeing what it is working on (using kill -QUIT <pid> as you mentioned).

Upvotes: 2

Related Questions