Leonard Feehan
Leonard Feehan

Reputation: 481

Clickhouse not consuming Kafka messages via complex Materialized View

TLDR Summary: Clickhouse Kafka engine, materialized view wont work with complex select statement.

Longer Version:

I am trying to send a large number of JSON data points to Clickhouse via its Kafka engine using JSONEachRow. But the materialized view wont consume the stream correctly. I have a kafka producer written in go, which takes data from multiple tcp streams and asynchronously writes to the kafka queue.

Data flows thus:

TCP Sources -> Producer -> Kafka -> Clickhouse(Kafka Engine) -> Materialized View -> Destination Table

All this works, so far so good.

I first hit a bottleneck when i ramped up the speed of input data (400,000 points/sec) my producer was not able to write to kafka fast enough and the connections piled up. So i hoped to try and batch the data, but it seems Clickhouse cannot take an array of json as input (https://clickhouse.yandex/docs/en/interfaces/formats/)

So i hit on the idea of batching the datapoints at their source and transforming the messages in the materialized view, so where before i had lots of individual messages:

{ "t": 1547457441651445401,"i": "device_2","c": 20001,"v": 56454654}" }

i now have a message which is multiples of the above and stringified,with newline delimiters between the points.

{"realtimes":"{\"t\":1547458266855015791,\"i\":\"device_2\",\"c\":20001,\"v\":56454654}\n{\"t\":1547458266855015791,\"i\":\"device_2\",\"c\":20001,\"v\":56454654}"}

The intention here is to parse and transform the string to multiple values using visitParamExtract in the select statement of the materialized view.

Materialized View:

CREATE MATERIALIZED VIEW ltdb_mat_view TO default.ltdb AS SELECT 
    visitParamExtractInt(x, 't') AS timestamp, 
    visitParamExtractString(x, 'i') AS device_id, 
    visitParamExtractInt(x, 'v') AS value FROM  (
    SELECT arrayJoin(*) AS x
    FROM 
    (
        SELECT splitByChar('\n', realtimes)
        FROM kafka_stream_realtimes 
    )  )

It seems to be doing something, since when it is running the kafka_stream_realtimes gets cleared and i cant query it manually getting an error "DB::Exception: Failed to claim consumer: ." but the data never hits the final table.

Summary:

For Completeness: kafka_stream_realimes:

CREATE TABLE IF NOT EXISTS kafka_stream_realtimes(realtimes String)
  ENGINE = Kafka('kafka:9092', 'realtimes', 'groupTest', 'JSONEachRow');

ltdb:

CREATE TABLE default.ltdb (timestamp Int64,device_id String,value Int64) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(toDateTime(round(timestamp/1000000000)))
ORDER BY (device_id, value)
SETTINGS index_granularity=8192;

Upvotes: 2

Views: 3783

Answers (1)

Amos
Amos

Reputation: 3276

but it seems Clickhouse cannot take an array of json as input

It seems the motivation is to do batch commit on the producer side. Why not just group a bunch of JSON rows and commit them in one go? ClickHouse will receive those multi-rows messages and parse them for you. You may also need to provide kafka_row_delimiter settings to the Kafka Engine as most kafka producers don't append the row delimiter at the end of each message.

So one message becomes

{ "t": 1547457441651445401,"i": "device_2","c": 20001,"v": 56454654}
{ "t": 1547457441651445402,"i": "device_2","c": 20001,"v": 56454654}
{ "t": 1547457441651445403,"i": "device_2","c": 20001,"v": 56454654}
...

Upvotes: 2

Related Questions