Reputation: 193
I'm trying to insert data from a csv file into Cassandra using pyspark.
Here is the code :
I read the data :
df =spark.read.format("csv") \
.option("header","true") \
.option("inferSchema","true") \
.option("nullValue","NA") \
.option("timestampFormat","ddMMMyyyy:HH:mm:ss") \
.option("quote", "\"") \
.option("delimiter", ";") \
.option("mode","failfast") \
.load("gs://tidy-centaur-b1/data/PRESCRIPTIONS_ANO.csv")
Edit : i put the whole code to show the unique key
dfi = df.withColumn("id", F.monotonically_increasing_id()) \
.withColumnRenamed("CHAIPRAT", "chaiprat") \
.withColumnRenamed("PRE_PRE_DTD", "pre_pre_dtd") \
.withColumnRenamed("NbMol", "nbmol") \
.withColumnRenamed("NumAno", "numano")
dfi.createOrReplaceTempView("prescription")
I count the rows and save the data into cassandra
dfi.count()
> 4169826
dfi.write.format("org.apache.spark.sql.cassandra") \
.mode("overwrite") \
.option("confirm.truncate","true") \
.option("spark.cassandra.connection.host","10.142.0.4") \
.option("spark.cassandra.connection.port","9042") \
.option("keyspace","uasb03") \
.option("table","prescription") \
.save()
Now I read the data from cassandra and count the rows.
presc = sql.read \
.format("org.apache.spark.sql.cassandra") \
.option("spark.cassandra.connection.host","10.142.0.4") \
.option("spark.cassandra.connection.port","9042") \
.load(table="prescription", keyspace="uasb03")
presc.count()
> 2148762
Only the half from the first counting.
I don't find anything in the log files that shows something went wrong. Does anyone have a clue ?
Edit : I tried to change all the timeout values in cassandra.yaml but presc.count remains the same
Edit here is the cassandra table description
cqlsh:uasb03> desc prescription;
CREATE TABLE uasb03.prescription (
id int PRIMARY KEY,
chaiprat int,
nbmol int,
numano int,
pre_pre_dtd timestamp
) WITH bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE';
To perform my verification, I also wrote the output in a csv file and i got
chaiprat;pre_pre_dtd;nbmol;numano;id
29100476;03Feb2017:00:00:00;5;378369;8589934592
29100476;24Feb2017:00:00:00;1;378369;8589934593
29100476;27Feb2017:00:00:00;2;378369;8589934594
the id is bigger than an int.
Upvotes: 1
Views: 763
Reputation: 87174
The most probable cause for this is that your data don't have really unique row identifier that could become the partition key, so when you're store the data, some values are overwritten. You can fix this by explicitly creating the table with correct partition key & clustering columns before saving the data. This could be done via createCassandraTable
call on your data frame (see docs), something like this:
createCassandraTable(
"uasb03", "prescription",
partitionKeyColumns = Some(Seq("columnA")),
clusteringKeyColumns = Some(Seq("columnB")))
Upvotes: 4