Reputation: 157
I have the following apache beam pipeline:
package ch.mycompany.bb8;
import ch.mycompany.bb8.transforms.LogRecords;
import java.io.File;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Bb8Pipeline {
private static final Logger LOG = LoggerFactory.getLogger(Bb8Pipeline.class);
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(CustomOptions options) {
Pipeline pipeline = Pipeline.create(options);
String schemaJson = "{"
+ "\"type\": \"record\","
+ "\"namespace\": \"com.google.cloud.pso\","
+ "\"name\": \"User\","
+ "\"fields\": ["
+ "{"
+ "\"name\": \"name\","
+ "\"type\": \"string\""
+ "},"
+ "{"
+ "\"name\": \"surname\","
+ "\"type\": \"string\""
+ "},"
+ "{"
+ "\"name\": \"age\","
+ "\"type\": \"int\""
+ "},"
+ "{"
+ "\"name\": \"retired\","
+ "\"type\": \"boolean\""
+ "}"
+ "]"
+ "}";
Schema avroSchema = new Schema.Parser().parse(schemaJson);
LOG.info(avroSchema.toString());
pipeline.apply("Read PubSub record strings",
PubsubIO.readAvroGenericRecords(avroSchema)
.fromSubscription(options.getInputSubscription()))
.apply("Simply log records", ParDo.of(new LogRecords()))
.apply("Write PubSub records", PubsubIO.writeStrings().to(options.getOutputTopic()));
return pipeline.run();
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
CustomOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomOptions.class);
options.setStreaming(true);
run(options);
}
}
I run the pipeline using maven as follows:
mvn compile exec:java \
-Dexec.mainClass=ch.mycompany.bb8.Bb8Pipeline \
-Dexec.args="--project=t2-prod \
--stagingLocation=gs://bb-8-staging/staging/ \
--tempLocation=gs://bb-8-staging/staging/ \
--runner=DataflowRunner \
--region=europe-west1 \
--jobName=bb-8-avro-test \
--outputTopic=projects/t2-prod/topics/bb-8-output \
--inputSubscription=projects/t2-prod/subscriptions/bb-8-ingest \
--maxNumWorkers=1"
And I get the following null pointer exception:
INFO: {"type":"record","name":"User","namespace":"com.google.cloud.pso","fields":[{"name":"name","type":"string"},{"name":"surname","type":"string"},{"name":"age","type":"int"},{"name":"retired","type":"boolean"}]}
[WARNING]
java.lang.NullPointerException
at java.util.concurrent.ConcurrentHashMap.get (ConcurrentHashMap.java:936)
at java.util.concurrent.ConcurrentHashMap.containsKey (ConcurrentHashMap.java:964)
at org.apache.avro.LogicalTypes.fromSchemaImpl (LogicalTypes.java:73)
at org.apache.avro.LogicalTypes.fromSchema (LogicalTypes.java:47)
at org.apache.beam.sdk.schemas.utils.AvroUtils.toFieldType (AvroUtils.java:673)
at org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamField (AvroUtils.java:290)
at org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamSchema (AvroUtils.java:313)
at org.apache.beam.sdk.schemas.utils.AvroUtils.getSchema (AvroUtils.java:415)
at org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.readAvroGenericRecords (PubsubIO.java:592)
at ch.mycompany.bb8.Bb8Pipeline.run (Bb8Pipeline.java:68)
at ch.mycompany.bb8.Bb8Pipeline.main (Bb8Pipeline.java:86)
at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:748)
As seen in the stack trace above, the schema is logged as expected and so the schema isn't null.
Does anyone know how to fix this error, or how I can debug further?
mvn -version
Apache Maven 3.6.0 (97c98ec64a1fdfee7767ce5ffb20918da4f719f3; 2018-10-24T20:41:47+02:00)
Maven home: /opt/apache-maven
Java version: 1.8.0_191, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-8-oracle/jre
Default locale: en_ZA, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-88-generic", arch: "amd64", family: "unix"
Beam version 2.19.0
org.apache.avro version 1.8.0
Upvotes: 1
Views: 498
Reputation: 159
This appears to be a dependency conflict related issue:
Beam 2.19.0 depends on Avro 1.8.2 (link), which has the correct implementation (see this line) and thus will not cause the problem.
But you mentioned you use Avro 1.8.0, which has the incorrect implementation (see this line) that may throw the NullPointerException
So a easy fix for this problem is to bump the Avro version you use to 1.8.2
Upvotes: 1