dsteve
dsteve

Reputation: 201

CombineFn Dataflow - Step not in order, creating null pointer

I am a newbie in dataflow, please pardon if I made a newbie mistake,

I recently use dataflow/beam to process several data from pubsub, and I am using cloud-dataflow-nyc-taxi-tycoon as a starting point, but I upgrade it to sdk 2.2.0 to make it work with Big Table. I simulate it using http cloud function that send a single data to pubsub so the dataflow can ingest it, using below code

 .apply("session windows on rides with early firings",
    Window.<KV<String, TableRow>>into(
      new GlobalWindows())
        .triggering(
        Repeatedly.forever(AfterPane.elementCountAtLeast(1))
        )
        .accumulatingFiredPanes()
        .withAllowedLateness(Duration.ZERO))

 .apply("group by", Combine.perKey(new LatestPointCombine()))

 .apply("prepare to big table",
   MapElements.via(new SimpleFunction<KV<String,TableRow>,TableRow >() {
     @Override
     public TableRow  apply(KV<String, TableRow> input) {
       TableRow  tableRow = null;
       try{
         tableRow=input.getValue();
         ....
       }
       catch (Exception ex){
         ex.printStackTrace();
       }
       return tableRow;
     }
   }))
 .apply....

But it gives me an error at phase "group by"/CombineFn, after "session windows on rides with early firings", here is the logs from stackdriver

1.    I  create accumulator 
2.    I  addinput 
3.    I  mergeaccumulators 
4.    I  extractoutput 
5.    I  pre mutation_transform
6.    W  Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
7.    I  mergeaccumulators 
8.    I  create accumulator 
9.    E  Uncaught exception:  
10.    E  Execution of work for S0 for key db2871226f7cec594ebd976e6758ac7e failed. Will retry locally. 
11.    I  Memory is used/total/max = 105/365/4949 MB, GC last/max = 1.00/1.00 %, #pushbacks=0, gc thrashing=false 
12.    I  create accumulator 
13.    I  addinput 
14.    I  mergeaccumulators 
15.    I  extractoutput 
16.    I  pre mutation_transform
17.    I  mergeaccumulators 
18.    I  create accumulator 
19.    E  Uncaught exception:  
20.    E  Execution of work for S0 for key db2871226f7cec594ebd976e6758ac7e failed. Will retry locally. 
21.    I  create accumulator 
...

My questions are :

A. What I dont understand is after step 4, (extract output), why the dataflow mergeaccumulator method called first (line 7.) and later on the create accumulator were called (line 8.) here is the mergeAccumulator method I wrote

public RidePoint mergeAccumulators(Iterable<RidePoint> latestList) {
  //RidePoint merged = createAccumulator();
  RidePoint merged=new RidePoint();
    LOG.info("mergeaccumulators");
  for (RidePoint latest : latestList) {
    if (latest==null){
      LOG.info("latestnull");
    }else
    if (merged.rideId == null || latest.timestamp > merged.timestamp){
      LOG.info(latest.timestamp + " latest " + latest.rideId);
      merged = new RidePoint(latest);
    }
  }
  return merged;
}

B. It seems the data is null, and I dont know what caused it, but it reach at the end of the pipeline, elements added in "session windows on rides with early firings" is showing 1 element added, but below that "group by" phase, ... gives 52 elements added,

The Detailed Uncaught exception that shown in the log, looks like this :
    (90c7caea3f5d5ad4): java.lang.NullPointerException: in com.google.codelabs.dataflow.utils.RidePoint in string null of string in field status of com.google.codelabs.dataflow.utils.RidePoint
            org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:161)
            org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
            org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
            org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
            com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575)
            com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320)
            com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:952)
            com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216)
            com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513)
            com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
            com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1071)
            com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133)
            com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.run(StreamingDataflowWorker.java:841)
            java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.NullPointerException
            org.apache.avro.specific.SpecificDatumWriter.writeString(SpecificDatumWriter.java:67)
            org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:128)
            org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
            org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
            org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
            org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
            org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191)
            org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
            org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
            org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
            org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
            org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
            org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
            org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
            com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575)
            com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320)
            com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:952)
            com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216)
            com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513)
            com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
            com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1071)
            com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133)
            com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.run(StreamingDataflowWorker.java:841)
            java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            java.lang.Thread.run(Thread.java:745)

Upvotes: 0

Views: 2219

Answers (1)

Scott Wegner
Scott Wegner

Reputation: 7493

Your question has many parts to it. But to start, here are some recommendations for debugging:

  1. Don't swallow exceptions. Currently in your "prepare to big table" logic you have: catch (Exception ex){ ex.printStackTrace(); }. This hides the exception and is causing null elements to be returned from the function. It's better to understand and fix the exception here rather dealing with invalid data later.

  2. Validate with the DirectRunner first. Make sure that your pipeline runs correctly on your machine using the Beam DirectRunner. This is the easiest way to understand and fix issues with the Beam model. You can run from the commandline or your favorite IDE and debugger. Then if your pipeline works on the DirectRunner but not on Dataflow, you know that there is a Dataflow-specific issue.

To touch on your specific questions:

A. What I dont understand is after step 4, (extract output), why the dataflow mergeaccumulator method called first (line 7.) and later on the create accumulator were called (line 8.)

Your code uses Combine.perKey, which will group elements by key value. So each unique key will cause an accumulator to be created. Dataflow also applies a set of optimizations which can parallelize and reorder independent operations, which could explain what you're seeing.

B. It seems the data is null, and I don;t know what caused it

The null values are likely those that hit an exception in your prepare to big table logic.

I'm not exactly sure what you mean with the output counts because I don't quite understand your pipeline topology. For example, your LatestPointCombine logic seems to output type RidePoint, but the "prepare to big table" function takes in a String. If you are still having trouble after following the suggestions above, you can post a Dataflow job_id and I can help investigate further.

Upvotes: 2

Related Questions