Reputation: 191
I have a batch dataflow pipeline that I have ran without issue many times on a subset of our data, with approximately 150k rows of input. I have now attempted to run on our full dataset of approximately 300M rows. A crucial part of the pipeline performs a GroupByKey of the input records resulting in (I believe) ~100M keys.
The relevant part of the pipeline looks like this:
// Results in map of ~10k elements, 160MB in size
PCollectionView<Map<String, Iterable<Z>>> sideData = ...
...
.apply(ParDo.named("Group into KV").of(
new DoFn<T, KV<String, T>>() { ... }
))
.apply("GBK", GroupByKey.create())
.apply(ParDo.named("Merge Values").withSideInputs(sideData).of(
new DoFn<KV<String, Iterable<T>>, V>() { ... }
))
Two times I have run this pipeline and each time the job has stalled after running fine for more than 16 hours. The first time I ran it using 10 n1-highmem-8
and the second time using 6 n1-highmem-16
instances.
I can tell from the Dataflow job console that the Group into KV
ParDo completes just fine and outputs 101,730,100 elements with a size of 153.67 GB. The step detail for the GBK
transform says that there were 72,091,846 and 72,495,353 elements added in the first and second attempts, respectively. At this point the GBK
transform is still in the running phase, but the CPU on all the machines drops to zero and the pipeline is effectively stalled. All future stages in the pipeline all stop incrementing element counts. I can ssh into the machines to look at the various logs under /var/log/dataflow/ but there doesn't appear to be anything out of the ordinary. No errors in the cloud console and the GC logs don't seem to indicate memory issues.
At this point I'm at a bit of a loss to know what to do next. I have read that using a Combiner
instead of using the GroupByKey
could yield better scalability. And with a little refactoring I could make it so that the code is commutative so that the Combiner would be an option. I'm somewhat hesitant to attempt that as each time I have tried to run this pipeline it has cost me ~$250 in wasted cloud compute time.
My questions are:
What are the recommended ways to try to figure out what the pipeline is doing when it appears to be stalled? Should I do a kill -QUIT <pid>
on the java process to get a stack trace, and if so, where would it go?
Does anyone have any theories about why this pipeline would suddenly stall without any errors or warnings?
IDS of aforementioned jobs:
Upvotes: 2
Views: 2051
Reputation: 6130
It looks like one worker may be either stuck or taking a long time to run the DoFn
code after the GroupByKey
. The most likely cause of this is a "hot key" (having significantly more values than other keys). You could add an Aggregator to the DoFn
and report the size of the Iterable while running, something like this:
private static class MyDoFn extends KV<String, Iterable<T>>, V> {
private static final Logger LOG =
LoggerFactory.getLogger(FilterTextFn.class);
private final Aggregator<Long, Long> maxIterableSize =
createAggregator("maxIterableSize", new Max.MaxLongFn());
@Override
public void processElement(ProcessContext c) {
long numElements = 0;
for (T value : c.element().getValue()) {
maxIterableSize.addValue(numElements++);
if (numElements == 100) {
LOG.warning("Key {} has > 100 values", c.element().getKey());
}
... // the rest of your code inside the loop
}
}
}
The above will add a counter showing the maximum number of elements on a single key, and also report to Cloud Logging any key that has more than 100 values (feel free to adjust the threshold as seems reasonable -- the single hot key likely has many more elements than any other key).
The other possibility is that there is something in the code for that DoFn
that is either hanging or really slow on some specific set of data. You could try connecting to the worker that is processing this one item and seeing what it is working on (using kill -QUIT <pid>
as you mentioned).
Upvotes: 2