Javier Roda
Javier Roda

Reputation: 113

Why am I getting java.lang.IllegalStateException on Google Dataflow?

I've upgraded to the new Google's dataflow version 1.6 and when I test in local machine I get a java.lang.IllegalStateException at the end of my pipeline. I hadn't this problem with version 1.5.1.

This doesn't occur in live environment just in local. Is it a bug of the new version? Is it necessary do changes in my code to avoid those errors?

I attached part of my pipeline to try find the problem.

private static void getTableRowAndWrite(final PCollection<KV<Integer, Iterable<byte[]>>> groupedTransactions, final String tableName) {
    // Get the tableRow element from the PCollection
    groupedTransactions
            .apply(ParDo
                    .of(((tableName.equals("avail")) ? new GetTableRowAvail() : new GetTableRowReservation())) //Get a TableRow
                    .named("Get " + tableName + " TableRows"))
            .apply(BigQueryIO
                    .Write
                    .named("Write to BigQuery " + tableName) //Write to BigQuery
                    .withSchema(createTableSchema())
                    .to((SerializableFunction<BoundedWindow, String>) window -> {
                        String date = window.toString();
                        String date2 = date.substring(1, 5) + date.substring(6, 8) + date.substring(9, 11);
                        return "travelinsights-1056:hotel." + tableName + "_full_" + (TEST ? "test_" : "") + date2;
                    })
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
            );
}

The error is:

Exception in thread "main" java.lang.IllegalStateException: Cleanup time 294293-06-23T12:00:54.774Z is beyond end-of-time
at com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
at com.google.cloud.dataflow.sdk.util.ReduceFnRunner.onTimer(ReduceFnRunner.java:642)
at com.google.cloud.dataflow.sdk.util.BatchTimerInternals.advance(BatchTimerInternals.java:134)
at com.google.cloud.dataflow.sdk.util.BatchTimerInternals.advanceInputWatermark(BatchTimerInternals.java:110)
at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaOutputBufferDoFn.processElement(GroupAlsoByWindowsViaOutputBufferDoFn.java:91)
at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138)
at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateHelper(ParDo.java:1229)
at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateSingleHelper(ParDo.java:1098)
at com.google.cloud.dataflow.sdk.transforms.ParDo.access$300(ParDo.java:457)
at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:1084)
at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:1079)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:858)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)

Upvotes: 2

Views: 447

Answers (1)

Kenn Knowles
Kenn Knowles

Reputation: 6033

You found a bug!

This has been filed as BEAM-341 and the fix is in review as #464 which will be ported to the Dataflow Java SDK immediately after review.

Without seeing the code that sets up windowing, triggering, and allowed lateness, I cannot be certain how this is affecting you. But there is a simple workaround that will work if you have non-global windowing and extremely large allowed lateness so that the window does not expire until the "end of time". In this case, you can update your job with allowed lateness that is merely very large (like hundreds of years) instead of effectively infinite.

Upvotes: 3

Related Questions