Asad Amir Khwaja
Asad Amir Khwaja

Reputation: 122

Clickhouse Materialized Views not inserting new rows automatically from source to target table

I'm using the following workflow for Clickhouse data workflows. Clickpipe moves data from a GCS bucket to a table that stores 2 columns user and metadata as String because they contain JSON. Then to denormalize the data I first pause the Clickpipe so new data isn't ingested while I get the materialized view ready. Then I do the following steps:

Create a target table for materialized view:

CREATE TABLE denormalized_add_to_cart (
    `event_type` String, `user_email` String, `user_id` Int64, `stableId` String, `environment` String,
    `timestamp` DateTime, `country` String, `id` Int64, `source` String, `sku` String, `quantity` Int32,
    `price` Float32, `currencyCode` String, `data_source` String, `clicked_from` String, `type` String
)
ENGINE = MergeTree
ORDER BY "timestamp"

Insert existing data from source table to the target table

INSERT INTO denormalized_add_to_cart
SELECT event_type, JSONExtractString(user, 'user_email') AS user_email, JSONExtractInt(user, 'user_id') AS user_id,
JSONExtractString(user, 'stableId') AS stableId, JSONExtractString(user, 'environment') AS environment,
toDateTime(substring(JSONExtractString(user, 'timestamp'), 1, 10)) AS "timestamp", JSONExtractString(user, 'country') AS country,
JSONExtractInt(metadata, 'id') AS id, JSONExtractString(metadata, 'source') AS source,
JSONExtractString(metadata, 'sku') AS sku, JSONExtractInt(metadata, 'quantity') AS quantity,
JSONExtractFloat(metadata, 'price') AS price, JSONExtractString(metadata, 'currencyCode') AS currencyCode,
JSONExtractString(metadata, 'data_source') AS data_source, JSONExtractString(metadata, 'clicked_from') AS clicked_from,
JSONExtractString(metadata, 'type') AS type
FROM add_to_cart

Create a materialized view on the target table so any new data on the source table is transformed and added there:

CREATE MATERIALIZED VIEW denormalized_add_to_cart_mv to denormalized_add_to_cart AS
SELECT event_type, JSONExtractString(user, 'user_email') AS user_email, JSONExtractInt(user, 'user_id') AS user_id,
JSONExtractString(user, 'stableId') AS stableId, JSONExtractString(user, 'environment') AS environment,
toDateTime(substring(JSONExtractString(user, 'timestamp'), 1, 10)) AS "timestamp", JSONExtractString(user, 'country') AS country,
JSONExtractInt(metadata, 'id') AS id, JSONExtractString(metadata, 'source') AS source,
JSONExtractString(metadata, 'sku') AS sku, JSONExtractInt(metadata, 'quantity') AS quantity,
JSONExtractFloat(metadata, 'price') AS price, JSONExtractString(metadata, 'currencyCode') AS currencyCode,
JSONExtractString(metadata, 'data_source') AS data_source, JSONExtractString(metadata, 'clicked_from') AS clicked_from,
JSONExtractString(metadata, 'type') AS type
FROM add_to_cart

After this I restart the pipeline but the issue is that new data isn't being inserted into the target table of the materialized view even though it is being inserted in the source table. Currently if I measure the size of my source table (add_to_cart) it is 62,101,000 rows and the size of the target table (denormalized_add_to_cart) is 62,089,000. These rows in the target table were inserted by the insert query given above and no row has been inserted since.

Upvotes: 0

Views: 45

Answers (0)

Related Questions