MusKaya
MusKaya

Reputation: 61

Kafka Connect: Read JSON serialized Kafka message, convert to Parquet format and persist in S3

I have a requirement to read JSON serialized messages from a Kafka topic, convert them to Parquet and persist in S3.

Background

The official S3-Sink-Connector supports Parquet output format but:

You must use the AvroConverter, ProtobufConverter, or JsonSchemaConverter with ParquetFormat for this connector. Attempting to use the JsonConverter (with or without schemas) results in a NullPointerException and a StackOverflowException.

And JsonSchemaConverter throws out an error if the message was not written using JSON Schema serialization.

Problem Statement

So, I'm looking for a way to read messages from a Kafka topic that were originally written in JSON format, somehow convert them to JSON Schema format and then plug them into the S3 connector that will write to S3 in Parquet format.

Or alternatively, I'm also open to alternative solutions (-that don't involve writing JAVA code-) given the main requirement (take Kafka message, put it in S3 as Parquet files). Thanks!

PS: Changing the way that these Kafka messages are written originally (such as using JSON Schema serialization with Schema Discovery) unfortunately is not an option for me at this time.

Upvotes: 1

Views: 2152

Answers (2)

Ravinder Kumar
Ravinder Kumar

Reputation: 3

If producer doesn't have a schema registry, there are 2 ways you could achieve it. One of them is:

  1. Spin up schema registry and KSQL. Create a base_stream with schema of the message and create another stream as "select * from base_stream" WITH (VALUE_FORMAT="AVRO").
  2. If you have properly configured KSQL and schema registry, KSQL will automatically submit schema as you have defined VALUE_FORMAT='AVRO').
  3. Use S3 sink connector with AVRO converter and Parquet format. Kafka-connect should be connected to schema registry as well for this to work. Some important properties: "format.class":"io.confluent.connect.s3.format.parquet.ParquetFormat", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schemas.enable": "true", "value.converter.schema.registry.url":"<your-schema-registry-url>"

Upvotes: 0

OneCricketeer
OneCricketeer

Reputation: 191743

In general, your data is required to have a schema because Parquet needs it (the S3 parquet writer translates to Avro as an intermediate step)

You could look into using this Connect transform that takes in a Schema, and attempts to apply a JSON Schema - see tests. Since this returns a Struct object, then you can try to use JsonSchemaConverter as part of the sink.

But if you are just throwing random JSON data into a single topic without any consistent fields or values, then you'll have a hard time applying any schema

Upvotes: 0

Related Questions