Reputation: 481
TLDR Summary: Clickhouse Kafka engine, materialized view wont work with complex select statement.
Longer Version:
I am trying to send a large number of JSON data points to Clickhouse via its Kafka engine using JSONEachRow. But the materialized view wont consume the stream correctly. I have a kafka producer written in go, which takes data from multiple tcp streams and asynchronously writes to the kafka queue.
Data flows thus:
TCP Sources -> Producer -> Kafka -> Clickhouse(Kafka Engine) -> Materialized View -> Destination Table
All this works, so far so good.
I first hit a bottleneck when i ramped up the speed of input data (400,000 points/sec) my producer was not able to write to kafka fast enough and the connections piled up. So i hoped to try and batch the data, but it seems Clickhouse cannot take an array of json as input (https://clickhouse.yandex/docs/en/interfaces/formats/)
So i hit on the idea of batching the datapoints at their source and transforming the messages in the materialized view, so where before i had lots of individual messages:
{ "t": 1547457441651445401,"i": "device_2","c": 20001,"v": 56454654}" }
i now have a message which is multiples of the above and stringified,with newline delimiters between the points.
{"realtimes":"{\"t\":1547458266855015791,\"i\":\"device_2\",\"c\":20001,\"v\":56454654}\n{\"t\":1547458266855015791,\"i\":\"device_2\",\"c\":20001,\"v\":56454654}"}
The intention here is to parse and transform the string to multiple values using visitParamExtract in the select statement of the materialized view.
Materialized View:
CREATE MATERIALIZED VIEW ltdb_mat_view TO default.ltdb AS SELECT
visitParamExtractInt(x, 't') AS timestamp,
visitParamExtractString(x, 'i') AS device_id,
visitParamExtractInt(x, 'v') AS value FROM (
SELECT arrayJoin(*) AS x
FROM
(
SELECT splitByChar('\n', realtimes)
FROM kafka_stream_realtimes
) )
It seems to be doing something, since when it is running the kafka_stream_realtimes gets cleared and i cant query it manually getting an error "DB::Exception: Failed to claim consumer: ." but the data never hits the final table.
Summary:
For Completeness: kafka_stream_realimes:
CREATE TABLE IF NOT EXISTS kafka_stream_realtimes(realtimes String)
ENGINE = Kafka('kafka:9092', 'realtimes', 'groupTest', 'JSONEachRow');
ltdb:
CREATE TABLE default.ltdb (timestamp Int64,device_id String,value Int64) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(toDateTime(round(timestamp/1000000000)))
ORDER BY (device_id, value)
SETTINGS index_granularity=8192;
Upvotes: 2
Views: 3783
Reputation: 3276
but it seems Clickhouse cannot take an array of json as input
It seems the motivation is to do batch commit on the producer side. Why not just group a bunch of JSON rows and commit them in one go? ClickHouse will receive those multi-rows messages and parse them for you. You may also need to provide kafka_row_delimiter
settings to the Kafka Engine as most kafka producers don't append the row delimiter at the end of each message.
So one message becomes
{ "t": 1547457441651445401,"i": "device_2","c": 20001,"v": 56454654}
{ "t": 1547457441651445402,"i": "device_2","c": 20001,"v": 56454654}
{ "t": 1547457441651445403,"i": "device_2","c": 20001,"v": 56454654}
...
Upvotes: 2