Reputation: 113
I need help regarding a kafka topic that I would like to put into HDFS in parquet format (with daily partitionner).
I have a lot of data in a kafka topic which are basically json data like this :
{"title":"Die Hard","year":1988,"cast":["Bruce Willis","Alan Rickman","Bonnie Bedelia","William Atherton","Paul Gleason","Reginald VelJohnson","Alexander Godunov"],"genres":["Action"]}
{"title":"Toy Story","year":1995,"cast":["Tim Allen","Tom Hanks","(voices)"],"genres":["Animated"]}
{"title":"Jurassic Park","year":1993,"cast":["Sam Neill","Laura Dern","Jeff Goldblum","Richard Attenborough"],"genres":["Adventure"]}
{"title":"The Lord of the Rings: The Fellowship of the Ring","year":2001,"cast":["Elijah Wood","Ian McKellen","Liv Tyler","Sean Astin","Viggo Mortensen","Orlando Bloom","Sean Bean","Hugo Weaving","Ian Holm"],"genres":["Fantasy »]}
{"title":"The Matrix","year":1999,"cast":["Keanu Reeves","Laurence Fishburne","Carrie-Anne Moss","Hugo Weaving","Joe Pantoliano"],"genres":["Science Fiction"]}
This topic's name is : test
And I would like to put those data into my HDFS cluster in parquet format. But I struggle with the sink connector configuration. I use the confluent hdfs-sink-connector for that.
Here is what I manage to do so far :
{
"name": "hdfs-sink",
"config": {
"name": "hdfs-sink",
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "1",
"topics": "test",
"hdfs.url": "hdfs://hdfs-IP:8020",
"hadoop.home": "/user/test-user/TEST",
"flush.size": "3",
"locale": "fr-fr",
"timezone": "UTC",
"format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.hdfs.partitioner.DailyPartitioner",
"consumer.auto.offset.reset": "earliest",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true"
}
}
Some explanation on why I configured the connector like that :
I understood that maybe I have to use the schema-registry for formatting the data into parquet but I don't know how to do that. And is it necessary?
Can you please help me on that?
Thank you
Upvotes: 0
Views: 2456
Reputation: 191973
I have not personally used the ParquetFormat
, but your data must have a schema, which means one of the following
Basically, It cannot be "plain JSON". I.e. you currently have "value.converter.schemas.enable": "true"
, and I'm guessing your connector isn't working because your records are not in the above format.
Basically, without a schema, the JSON parser cannot possible know what "columns" that Parquet needs to write.
And Daily Partitioner does not create one file per day, only a directory. You will get one file per flush.size
and there is also a configuration for scheduled rotate intervals of flushing files. In addition, there will be one file per Kafka partition.
Also, "consumer.auto.offset.reset": "earliest",
only works in the connect-distribtued.properties
file, not on a per-connector bases, AFAIK.
Since I haven't personally used the ParquetFormat
, that's all the advice I can give, but I have used other tools like NiFi for similar goals, which will allow you to not change your existing Kafka producer code.
Alternatively, use JSONFormat
instead, however, Hive integration will not work, automatically, and the tables must be pre-defined (which will require you having a schema for your topic anyway).
And another option is just configure Hive to read from Kafka directly
Upvotes: 1