Reputation: 1951
I want to load stream data, then add a key and then count them by key.
Apache Beam Dataflow pipline gets a memory error when i try to load and group-by-key a big-size data using streaming approach (unbounded data) . Because it seems that data is accumulated in group-by and it does not fire data earlier with triggering of each window.
If I decrease the elements size (elements count will not change) it works! because actually group-by step waits for all the data to be grouped and then fire all the new windowed data.
I tested with both:
beam version 2.11.0 and scio version 0.7.4
beam version 2.6.0 and scio version 0.6.1
As you can see the data is accumulated in group-by step and does not get emitted.
val windowedData = data.applyKvTransform(
Window.into[myt](
Sessions.withGapDuration(Duration.millis(1)))
.triggering(
Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(10),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1)))
).orFinally(AfterWatermark.pastEndOfWindow())
).withAllowedLateness(Duration.standardSeconds(100))
.discardingFiredPanes()
)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException: Commit request for stage S2 and key 2 is larger than 2GB and cannot be processed. This may be caused by grouping a very large amount of data in a single window without using Combine, or by producing a large amount of data from a single input element.
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException.causedBy(StreamingDataflowWorker.java:230)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1287)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:146)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1008)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Is there any solution to solve the memory problem maybe by forcing group-by to emit early results of each window.
Upvotes: 4
Views: 2863
Reputation: 101
The KeyCommitTooLargeException is not a memory problem but a protobuf serialization problem. Protobuf has a limit of 2GB for an object (google protobuf maximum size). Dataflow found that the value of a single key in the pipeline was larger than 2GB therefore it couldn't shuffle the data. The error message indicates that "This may be caused by grouping a very large amount of data in a single window without using Combine, or by producing a large amount of data from a single input element." Based on your pipeline setup (i.e., assigned random keys), it is more likely the latter.
The pipeline may have read a large file (>2GB) from GCS and assigned it to a random key. GroupByKey requires a key shuffle operation and Dataflow failed to do due to the protobuf limitation therefore stuck on that key and hold the watermark.
If a single key has large value, you may want to reduce the value size, for example, compress the string, or split the string to multiple keys, or generate smaller GCS file in the first place.
If the large value is from grouping of multiple keys, you may want to increase the key space so every group by key operations end up group fewer keys together.
Upvotes: 6