Yrah
Yrah

Reputation: 113

Kafka topic data to HDFS parquet file using HDFS sink connector configuration issue

I need help regarding a kafka topic that I would like to put into HDFS in parquet format (with daily partitionner).

I have a lot of data in a kafka topic which are basically json data like this :

{"title":"Die Hard","year":1988,"cast":["Bruce Willis","Alan Rickman","Bonnie Bedelia","William Atherton","Paul Gleason","Reginald VelJohnson","Alexander Godunov"],"genres":["Action"]}
{"title":"Toy Story","year":1995,"cast":["Tim Allen","Tom Hanks","(voices)"],"genres":["Animated"]}
{"title":"Jurassic Park","year":1993,"cast":["Sam Neill","Laura Dern","Jeff Goldblum","Richard Attenborough"],"genres":["Adventure"]}
{"title":"The Lord of the Rings: The Fellowship of the Ring","year":2001,"cast":["Elijah Wood","Ian McKellen","Liv Tyler","Sean Astin","Viggo Mortensen","Orlando Bloom","Sean Bean","Hugo Weaving","Ian Holm"],"genres":["Fantasy »]}
{"title":"The Matrix","year":1999,"cast":["Keanu Reeves","Laurence Fishburne","Carrie-Anne Moss","Hugo Weaving","Joe Pantoliano"],"genres":["Science Fiction"]}

This topic's name is : test

And I would like to put those data into my HDFS cluster in parquet format. But I struggle with the sink connector configuration. I use the confluent hdfs-sink-connector for that.

Here is what I manage to do so far :

{
  "name": "hdfs-sink",
  "config": {
    "name": "hdfs-sink",
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "1",
    "topics": "test",
    "hdfs.url": "hdfs://hdfs-IP:8020",
    "hadoop.home": "/user/test-user/TEST",
    "flush.size": "3",
    "locale": "fr-fr",
    "timezone": "UTC",
    "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
    "partitioner.class": "io.confluent.connect.hdfs.partitioner.DailyPartitioner",
    "consumer.auto.offset.reset": "earliest",
    "value.converter":  "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true"
    
  }
}

Some explanation on why I configured the connector like that :

I understood that maybe I have to use the schema-registry for formatting the data into parquet but I don't know how to do that. And is it necessary?

Can you please help me on that?

Thank you

Upvotes: 0

Views: 2456

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191973

I have not personally used the ParquetFormat, but your data must have a schema, which means one of the following

  1. Your data is produced using Confluent Avro serializer
  2. Your data is produced as Protobuf and you get the Protobuf converter added to your Connect workers
  3. You use Kafka Connect's special JSON format that includes a schema within your records.

Basically, It cannot be "plain JSON". I.e. you currently have "value.converter.schemas.enable": "true", and I'm guessing your connector isn't working because your records are not in the above format.

Basically, without a schema, the JSON parser cannot possible know what "columns" that Parquet needs to write.


And Daily Partitioner does not create one file per day, only a directory. You will get one file per flush.size and there is also a configuration for scheduled rotate intervals of flushing files. In addition, there will be one file per Kafka partition.


Also, "consumer.auto.offset.reset": "earliest", only works in the connect-distribtued.properties file, not on a per-connector bases, AFAIK.


Since I haven't personally used the ParquetFormat, that's all the advice I can give, but I have used other tools like NiFi for similar goals, which will allow you to not change your existing Kafka producer code.


Alternatively, use JSONFormat instead, however, Hive integration will not work, automatically, and the tables must be pre-defined (which will require you having a schema for your topic anyway).


And another option is just configure Hive to read from Kafka directly

Upvotes: 1

Related Questions