Reputation: 486
I am relatively new to hive/hadoop
I was reading through this Hive Storage Handlers.
Now I am trying to write a custom implementation of HiveStorageHandler for querying and pushing messages to Kafka using a Hive Table.
I saw that there are other implementations of HiveStorageHandler which lets us query and write on NoSQL databases using hive tables.
I am trying to replicate that for Kafka. I found a project on it
HiveKa - query Kafka using Hive
Here they are trying to read data from Kafka using queries on the hive table. I wish to write on the kafka topic using insert on the table.
Can someone please guide me on this ?
Upvotes: 0
Views: 1198
Reputation: 1743
I wish to write on the kafka topic using insert on the table.
This is possible using Kafka HiveStorageHandler. Below are general use cases possible with this feature
You are trying to do the 3rd use case.
First create two external tables for source and destination Kafka topics.
create external table if not exists source_topic_table
(
<fields>
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
'kafka.topic'='source_topic_name',
'kafka.bootstrap.servers'=''
);
create external table if not exists target_topic_table
(
<fields>
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
'kafka.topic'='target_topic_name',
'kafka.bootstrap.servers'=''
);
Then use merge query to insert data into target Kafka topic
merge into target_topic_table
using (
select
<columns>,
cast(null as binary) as `__key`,
cast(null as int) as `__partition`,
cast(-1 as bigint) as `__offset`,
cast(to_epoch_milli(current_timestamp) as bigint) as `__timestamp`
from source_topic_table
) sub
on
sub.column_name = target_topic_table.coulmn_name <Some condition>
when not matched then insert values
(
<sub.columns>,
sub.`__key`,sub.`__partition`,sub.`__offset`,sub.`__timestamp`
);
Note:
Hive external non-native table is used
In addition to the user-defined payload schema, the Kafka storage handler appends 4 additional columns(__key, __partition, __offset, __timestmap) which users can use to query the Kafka metadata fields
Users has to set 'kafka.serde.class' table property, if data is not in csv format
Users can also set' kafka.write.semantic' table property which allows NONE, AT_LEAST_ONCE, or EXACTLY_ONCE value.
Upvotes: 1
Reputation: 191973
If I understand correctly, you want to read events from Hive, and push to Kafka. I don't have experience with the storage handlers, but I would rather suggest writing the appropriate code to produce to Kafka, then feed those events into Hadoop/Hive.
Within Kafka there is a framework called Kafka Connect that writes to external systems.Confluent has written such a Connector for HDFS that offers Hive support by updating the Hive metastore whenever a file is written into HDFS.
Without writing a storage handler, you can try using the JDBC Source connector, or otherwise Spark/Flink to read that data from Hive and push into Kafka.
Generally, though, Hadoop is the destination for CDC events, not a generating source of it. Mostly because it's just slow to query... If you wanted to create events on inserts, it'll generally require some table scan, so generating events from Cassandra/Hbase might be a better option
Upvotes: 0