Jice
Jice

Reputation: 193

Inserting data from spark to cassandra : how to verify everything is ok

I'm trying to insert data from a csv file into Cassandra using pyspark.

Here is the code :

I read the data :

    df =spark.read.format("csv") \
        .option("header","true") \
        .option("inferSchema","true") \
        .option("nullValue","NA") \
        .option("timestampFormat","ddMMMyyyy:HH:mm:ss") \
        .option("quote", "\"") \
        .option("delimiter", ";") \
        .option("mode","failfast") \
        .load("gs://tidy-centaur-b1/data/PRESCRIPTIONS_ANO.csv")

Edit : i put the whole code to show the unique key

    dfi = df.withColumn("id", F.monotonically_increasing_id()) \
        .withColumnRenamed("CHAIPRAT", "chaiprat") \
        .withColumnRenamed("PRE_PRE_DTD", "pre_pre_dtd") \
        .withColumnRenamed("NbMol", "nbmol") \
        .withColumnRenamed("NumAno", "numano")


    dfi.createOrReplaceTempView("prescription")

I count the rows and save the data into cassandra

    dfi.count()
    > 4169826

    dfi.write.format("org.apache.spark.sql.cassandra") \
        .mode("overwrite") \
        .option("confirm.truncate","true") \
        .option("spark.cassandra.connection.host","10.142.0.4") \
        .option("spark.cassandra.connection.port","9042") \
        .option("keyspace","uasb03") \
        .option("table","prescription") \
        .save()

Now I read the data from cassandra and count the rows.

    presc = sql.read \
        .format("org.apache.spark.sql.cassandra") \
        .option("spark.cassandra.connection.host","10.142.0.4") \
        .option("spark.cassandra.connection.port","9042") \
        .load(table="prescription", keyspace="uasb03")

    presc.count()
    > 2148762

Only the half from the first counting.

I don't find anything in the log files that shows something went wrong. Does anyone have a clue ?

Edit : I tried to change all the timeout values in cassandra.yaml but presc.count remains the same

Edit here is the cassandra table description

    cqlsh:uasb03> desc prescription;

    CREATE TABLE uasb03.prescription (
        id int PRIMARY KEY,
        chaiprat int,
        nbmol int,
        numano int,
        pre_pre_dtd timestamp
    ) WITH bloom_filter_fp_chance = 0.01
        AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
        AND comment = ''
        AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
        AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
        AND crc_check_chance = 1.0
        AND dclocal_read_repair_chance = 0.1
        AND default_time_to_live = 0
        AND gc_grace_seconds = 864000
        AND max_index_interval = 2048
        AND memtable_flush_period_in_ms = 0
        AND min_index_interval = 128
        AND read_repair_chance = 0.0
        AND speculative_retry = '99PERCENTILE';

To perform my verification, I also wrote the output in a csv file and i got

    chaiprat;pre_pre_dtd;nbmol;numano;id
    29100476;03Feb2017:00:00:00;5;378369;8589934592
    29100476;24Feb2017:00:00:00;1;378369;8589934593
    29100476;27Feb2017:00:00:00;2;378369;8589934594

the id is bigger than an int.

Upvotes: 1

Views: 763

Answers (1)

Alex Ott
Alex Ott

Reputation: 87174

The most probable cause for this is that your data don't have really unique row identifier that could become the partition key, so when you're store the data, some values are overwritten. You can fix this by explicitly creating the table with correct partition key & clustering columns before saving the data. This could be done via createCassandraTable call on your data frame (see docs), something like this:

createCassandraTable(
  "uasb03", "prescription", 
  partitionKeyColumns = Some(Seq("columnA")), 
  clusteringKeyColumns = Some(Seq("columnB")))

Upvotes: 4

Related Questions