Reputation: 285
there!I'm new to Cloud-DataFlow.
I'm using DataflowPipelineRunner to read a csv file and output the result to BigQuery.It works well when the csv file's size is small(only 20 records,less than 1MB),but went OOM error while the file's size becomes huge(over 10million records, about 616.42 MB).
Below is the error message:
java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at com.google.cloud.dataflow.sdk.util.StreamUtils.getBytes(StreamUtils.java:63) at co.coder.MyCoder.decode(MyCoder.java:54) at co.coder.MyCoder.decode(MyCoder.java:1) at com.google.cloud.dataflow.sdk.io.TextIO$TextSource$TextBasedReader.decodeCurrentElement(TextIO.java:1065) at com.google.cloud.dataflow.sdk.io.TextIO$TextSource$TextBasedReader.readNextRecord(TextIO.java:1052) at com.google.cloud.dataflow.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:536) at com.google.cloud.dataflow.sdk.io.OffsetBasedSource$OffsetBasedReader.advance(OffsetBasedSource.java:287) at com.google.cloud.dataflow.sdk.runners.worker.WorkerCustomSources$BoundedReaderIterator.advance(WorkerCustomSources.java:541) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation$SynchronizedReaderIterator.advance(ReadOperation.java:425) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:217) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:182) at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:69) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:284) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:220) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:170) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:192) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:172) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:159) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
From the error message,error happened in [MyCoder.java:54].MyCoder is a subclass of CustomCoder implemented by me,which is going to encode csv file from Shift-JIS to UTF-8:
53:@Override
54:public String decode(InputStream inStream, Context context) throws CoderException, IOException {
55: if (context.isWholeStream) {
56: byte[] bytes = StreamUtils.getBytes(inStream);
57: return new String(bytes, Charset.forName("Shift_JIS"));
58: } else {
59: try {
60: return readString(new DataInputStream(inStream));
61: } catch (EOFException | UTFDataFormatException exn) {
62: // These exceptions correspond to decoding problems, so change
63: // what kind of exception they're branded as.
64: throw new CoderException(exn);
65: }
66: }
67:}
and alse,here is how I ran the DataflowPipelineRunner:
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setRunner(DataflowPipelineRunner.class);
options.setProject(projectId);
options.setStagingLocation(stagingFolderPathInGCS);
options.setWorkerMachineType("n1-highmem-4");
options.setMaxNumWorkers(5);
Pipeline p = Pipeline.create(options);
// read csv from gcs
PCollection<String> lines = p.apply(TextIO.Read.named("csv input")
.from("gs://" + bucketName + "/original/" + fileName).withCoder(MyCoder.of()));
lines.apply(TextIO.Write.named("csv output").to("gs://" + bucketName + "/encoded/" + fileName)
.withCoder(StringUtf8Coder.of()).withoutSharding().withHeader("test Header"));
p.run();
Since Dataflow is a scalable cloud service for big data,so I am quiet a little confused for this OOM error,can anyone explain to me why the [OutOfMemoryError] happened and how to resolve it?
Thanks very much!
Upvotes: 0
Views: 885
Reputation: 285
I didn't quiet understand but solve the problem like this below:
but went OOM error while the file's size becomes huge(over 10million records, about 616.42 MB).
That's because I just make the test data from coping the smaller file(only 20 records,less than 1MB),in an other way,the 10 million data just only have 20 keys. so I changed another test data which have lot's of keys(not too much dumplicated data).
And also,I followed Kenn Knowles suggestion to let the dataflow manage it's job and instance automaticly by removing the codes below:
withoutSharding()
options.setWorkerMachineType("n1-highmem-4");
Finnaly the dataflow job works well(MachineType automaticly uses n1-standard-1)!
Further informations about dataflow's [Dynamic Work Rebalancing] can be found below: https://cloud.google.com/dataflow/service/dataflow-service-desc#Autotuning
Upvotes: 1