Sriram R
Sriram R

Reputation: 71

ClickHouse Distributed Table has duplicate rows

I am running a spark job to insert data into Clickhouse through Clickhouse JDBC driver. In a single node setup, everything works as expected.

But when I ran the same job on a multi-node cluster [ no replication, 2 shards ], when I try SELECT COUNT() from table, I get 2X of the number of rows I had inserted.

Here's the exact setup Machine 1 - Master node with only Distributed Table Create Table Query - CREATE TABLE trial.illogs (userId String, country String, date String, domain String, pathname String, platform String, siteId Int64, uniqueId String, receivedTs Int64, ua String, clientIp String, receivedDate Date) ENGINE = Distributed('trial-cluster', '', 'illogs', rand())

Machine 2 & Machine 3 - Shard with MergeTree Table Create Table Query - CREATE TABLE trial.illogs (userId String, country String, date String, domain String, pathname String, platform String, siteId Int64, uniqueId String, receivedTs Int64, ua String, clientIp String, receivedDate Date) ENGINE = MergeTree ORDER BY receivedTs TTL receivedDate + toIntervalMonth(1) SETTINGS index_granularity = 8192

Here's the config.xml for shard setup

<remote_servers>
        <trial-cluster>
                <shard>
                        <replica>
                        <default_database>trial</default_database>
                        <host>node1</host>
                        <port>9000</port>
                        <password>zxcv</password>
                        </replica>
                </shard>
                <shard>
                        <replica>
                        <default_database>trial</default_database>
                        <host>node2</host>
                        <port>9000</port>
                        <password>zxcv</password>
                        </replica>
                </shard>
        </trial-cluster>
    </remote_servers>

Here's the spark job that inserts into Click house [ I've just added the write part to keep it short ] :

df
                .write()
                .format("jdbc")
                .mode(SaveMode.Append)
                .option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
                .option("url", jdbcUrl)
                .option("user", "default")
                .option("password", pass)
                .option("ssh", "false")
                .option("createTableOptions", createTableOptions)
                .option("dbtable", tableName)
                .option("truncate", "true")
                .option("batchsize", batchSize)
                .option("numPartitions", maxWritePartitions)
                .option("isolationLevel", "NONE")
                .save();

When I ran the same spark job in a single cluster setup, the count of the DataFrame matches what's present in Click house. But with 2 shards, the count is exactly 2x of what's present in the DataFrame.

An option is to split the data in spark and insert it in each node respectively, but I want to make it work where I don't handle the sharding logic and let click house handle it by inserting directly on the Distributed Table.

Upvotes: 0

Views: 1999

Answers (1)

Milen Georgiev
Milen Georgiev

Reputation: 512

You might have similar issue as the person in this SO question

It seems that, if you've set the sharding key as random, the data will be duplicated to both replicas. To avoid the duplication issue, it was suggested to set the sharding key based on the primary key for your table.

This answer has more details about deduplication with ReplacingMergeTree.

Upvotes: 1

Related Questions