Reputation: 672
String sql = "select CUSTOMER_ID, CITY, CUSTOMER_FIRSTTNAME from LIMA.CHECK_FLOW where CUSTOMER_ID < 1000";
Schema schema = Schema.of(
Schema.Field.of("CUSTOMER_ID", Schema.FieldType.INT32),
Schema.Field.of("CITY", Schema.FieldType.STRING),
Schema.Field.of("CUSTOMER_FIRSTTNAME", Schema.FieldType.STRING)
);
PCollection<Row> result = pipe.apply(JdbcIO.readRows()
.withDataSourceConfiguration(configuration)
.withQuery(sql)
.withFetchSize(1000)
).setCoder(RowCoder.of(schema)).setRowSchema(schema);
PipelineResult nRes = pipe.run();
We are testing out Beams to see if it fits our usecases. Above is a simple, example. I get the following error
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:122)
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:59)
at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:208)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:127)
at org.apache.beam.sdk.io.jdbc.LogicalTypes$FixedPrecisionNumeric.toInputType(LogicalTypes.java:268)
at org.apache.beam.sdk.io.jdbc.LogicalTypes$FixedPrecisionNumeric.toInputType(LogicalTypes.java:246)
at org.apache.beam.sdk.io.jdbc.SchemaUtil.lambda$createLogicalTypeExtractor$ca0ab2ec$1(SchemaUtil.java:279)
at org.apache.beam.sdk.io.jdbc.SchemaUtil$BeamRowMapper.mapRow(SchemaUtil.java:344)
at org.apache.beam.sdk.io.jdbc.SchemaUtil$BeamRowMapper.mapRow(SchemaUtil.java:322)
at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.processElement(JdbcIO.java:924)
Any help to resolve the error will be deeply appreciated.
Upvotes: 0
Views: 224
Reputation: 573
I was facing the same issue and is resolved as part of this pull request.
The release will be out around January 2022. For now, you could apply the changes of this PR into your own project by creating theorg/apache/beam/sdk/io/jdbc/LogicalTypes.java class and copying the contents of this file in? I had the same advice.
Upvotes: 1