Reputation: 71
I am running a spark job to insert data into Clickhouse through Clickhouse JDBC driver. In a single node setup, everything works as expected.
But when I ran the same job on a multi-node cluster [ no replication, 2 shards ], when I try SELECT COUNT() from table
, I get 2X of the number of rows I had inserted.
Here's the exact setup
Machine 1 - Master node with only Distributed Table
Create Table Query - CREATE TABLE trial.illogs (userId String, country String, date String, domain String, pathname String, platform String, siteId Int64, uniqueId String, receivedTs Int64, ua String, clientIp String, receivedDate Date) ENGINE = Distributed('trial-cluster', '', 'illogs', rand())
Machine 2 & Machine 3 - Shard with MergeTree Table
Create Table Query - CREATE TABLE trial.illogs (userId String, country String, date String, domain String, pathname String, platform String, siteId Int64, uniqueId String, receivedTs Int64, ua String, clientIp String, receivedDate Date) ENGINE = MergeTree ORDER BY receivedTs TTL receivedDate + toIntervalMonth(1) SETTINGS index_granularity = 8192
Here's the config.xml for shard setup
<remote_servers>
<trial-cluster>
<shard>
<replica>
<default_database>trial</default_database>
<host>node1</host>
<port>9000</port>
<password>zxcv</password>
</replica>
</shard>
<shard>
<replica>
<default_database>trial</default_database>
<host>node2</host>
<port>9000</port>
<password>zxcv</password>
</replica>
</shard>
</trial-cluster>
</remote_servers>
Here's the spark job that inserts into Click house [ I've just added the write part to keep it short ] :
df
.write()
.format("jdbc")
.mode(SaveMode.Append)
.option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
.option("url", jdbcUrl)
.option("user", "default")
.option("password", pass)
.option("ssh", "false")
.option("createTableOptions", createTableOptions)
.option("dbtable", tableName)
.option("truncate", "true")
.option("batchsize", batchSize)
.option("numPartitions", maxWritePartitions)
.option("isolationLevel", "NONE")
.save();
When I ran the same spark job in a single cluster setup, the count of the DataFrame matches what's present in Click house. But with 2 shards, the count is exactly 2x of what's present in the DataFrame.
An option is to split the data in spark and insert it in each node respectively, but I want to make it work where I don't handle the sharding logic and let click house handle it by inserting directly on the Distributed Table.
Upvotes: 0
Views: 1999
Reputation: 512
You might have similar issue as the person in this SO question
It seems that, if you've set the sharding key as random, the data will be duplicated to both replicas. To avoid the duplication issue, it was suggested to set the sharding key based on the primary key for your table.
This answer has more details about deduplication with ReplacingMergeTree.
Upvotes: 1