GecKo
GecKo

Reputation: 173

Can not insert data to ClickHouse table from Kafka stream using JDBC Connect

Kafka and Clickhouse are running in Docker. And I'm trying to insert some data from Kafka stream to ClickHouse table through JDBC Connect. Querying data from stream shows that data is in in the stream.Then I create Clickhouse table with the same fields as in the Kafka stream

CREATE TABLE IF NOT EXISTS default.table
(
    id Int32, 
    ms_segment_group_id Int32,
    transact_survey_tmpl_id Int32,
    customer_id Int32, 
    customer_account_id Int32,
    agent String,
    comm_channel_id Int32,
    comm_channel_address String,
    answer_date String,
    communication_type_id Int32, 
    object_id Int32,
    create_date String,
    application_id String,
    external_id Int32, 
    transact_survey_state_id Int32, 
    code String
) ENGINE = MergeTree()
  PARTITION BY id
  ORDER BY tuple();

Then I create JDBC CONNECT in ksql

    CREATE SOURCE CONNECTOR `clickhouse-jdbc-connector` WITH (
    'connector.class'='io.confluent.connect.jdbc.JdbcSinkConnector',
    'topics'='STREAM_TEST',
    'tasks.max'='1',
    'connection.url'='jdbc:clickhouse://clickhouse:8123/default',
    'table.name.format'='table'
    );  

The output from ksql tells that connector is created and everything is ok. Then I'm trying to SELECT from ClickHouse Table and get the empty result:

image

As I understand that there is no data in the ClickHouse table and JDBC connect doesn't work properly?

Upvotes: 0

Views: 1044

Answers (1)

Sounak Saha
Sounak Saha

Reputation: 943

The easiest way to do this to change your engine from MergeTree to Kafka. First, need to sure Kafka topic is getting messages.

) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_row_delimiter = 'delimiter_symbol',]
    [kafka_schema = '',]
    [kafka_num_consumers = N,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_thread_per_consumer = 0]

Upvotes: 1

Related Questions