raizsh
raizsh

Reputation: 486

Kafka producer using HiveStorageHandler

I am relatively new to hive/hadoop

I was reading through this Hive Storage Handlers.

Now I am trying to write a custom implementation of HiveStorageHandler for querying and pushing messages to Kafka using a Hive Table.

I saw that there are other implementations of HiveStorageHandler which lets us query and write on NoSQL databases using hive tables.

I am trying to replicate that for Kafka. I found a project on it

HiveKa - query Kafka using Hive

Here they are trying to read data from Kafka using queries on the hive table. I wish to write on the kafka topic using insert on the table.

Can someone please guide me on this ?

Upvotes: 0

Views: 1198

Answers (2)

arunkvelu
arunkvelu

Reputation: 1743

I wish to write on the kafka topic using insert on the table.

This is possible using Kafka HiveStorageHandler. Below are general use cases possible with this feature

  1. Query Kafka topics
  2. Query data from Kafka topics and insert into hive managed/external table
  3. Query data from Kafka topics and push into other Kafka topics
  4. Query data from hive external/managed table and push into Kafka topics

You are trying to do the 3rd use case.

First create two external tables for source and destination Kafka topics.

create external table if not exists source_topic_table
(
<fields>
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
'kafka.topic'='source_topic_name',
'kafka.bootstrap.servers'=''
);


create external table if not exists target_topic_table
(
<fields>
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
'kafka.topic'='target_topic_name',
'kafka.bootstrap.servers'=''
);

Then use merge query to insert data into target Kafka topic

merge into target_topic_table
using (
select
<columns>,
cast(null as binary) as `__key`,
cast(null as int) as `__partition`,
cast(-1 as bigint) as `__offset`,
cast(to_epoch_milli(current_timestamp) as bigint) as `__timestamp`
from source_topic_table
) sub
on
sub.column_name = target_topic_table.coulmn_name <Some condition>
when not matched then insert values
(
<sub.columns>,
sub.`__key`,sub.`__partition`,sub.`__offset`,sub.`__timestamp`
);

Note:

  1. Hive external non-native table is used

  2. In addition to the user-defined payload schema, the Kafka storage handler appends 4 additional columns(__key, __partition, __offset, __timestmap) which users can use to query the Kafka metadata fields

  3. Users has to set 'kafka.serde.class' table property, if data is not in csv format

  4. Users can also set' kafka.write.semantic' table property which allows NONE, AT_LEAST_ONCE, or EXACTLY_ONCE value.

Upvotes: 1

OneCricketeer
OneCricketeer

Reputation: 191973

If I understand correctly, you want to read events from Hive, and push to Kafka. I don't have experience with the storage handlers, but I would rather suggest writing the appropriate code to produce to Kafka, then feed those events into Hadoop/Hive.

Within Kafka there is a framework called Kafka Connect that writes to external systems.Confluent has written such a Connector for HDFS that offers Hive support by updating the Hive metastore whenever a file is written into HDFS.

Without writing a storage handler, you can try using the JDBC Source connector, or otherwise Spark/Flink to read that data from Hive and push into Kafka.

Generally, though, Hadoop is the destination for CDC events, not a generating source of it. Mostly because it's just slow to query... If you wanted to create events on inserts, it'll generally require some table scan, so generating events from Cassandra/Hbase might be a better option

Upvotes: 0

Related Questions