Reputation: 61
I have a requirement to read JSON serialized messages from a Kafka topic, convert them to Parquet and persist in S3.
Background
The official S3-Sink-Connector supports Parquet output format but:
You must use the AvroConverter, ProtobufConverter, or JsonSchemaConverter with ParquetFormat for this connector. Attempting to use the JsonConverter (with or without schemas) results in a NullPointerException and a StackOverflowException.
Problem Statement
So, I'm looking for a way to read messages from a Kafka topic that were originally written in JSON format, somehow convert them to JSON Schema format and then plug them into the S3 connector that will write to S3 in Parquet format.
Or alternatively, I'm also open to alternative solutions (-that don't involve writing JAVA code-) given the main requirement (take Kafka message, put it in S3 as Parquet files). Thanks!
PS: Changing the way that these Kafka messages are written originally (such as using JSON Schema serialization with Schema Discovery) unfortunately is not an option for me at this time.
Upvotes: 1
Views: 2152
Reputation: 3
If producer doesn't have a schema registry, there are 2 ways you could achieve it. One of them is:
"format.class":"io.confluent.connect.s3.format.parquet.ParquetFormat", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schemas.enable": "true", "value.converter.schema.registry.url":"<your-schema-registry-url>"
Upvotes: 0
Reputation: 191743
In general, your data is required to have a schema because Parquet needs it (the S3 parquet writer translates to Avro as an intermediate step)
You could look into using this Connect transform that takes in a Schema, and attempts to apply a JSON Schema - see tests. Since this returns a Struct
object, then you can try to use JsonSchemaConverter
as part of the sink.
But if you are just throwing random JSON data into a single topic without any consistent fields or values, then you'll have a hard time applying any schema
Upvotes: 0