drobin
drobin

Reputation: 276

Spark To Cassandra: Writing Sparse Rows With No Null Values To Cassandra

Q: How do I write only columns with values from a Spark DataFrame into Cassanrda and do this efficiently? (efficiently as in minimal lines of Scala code and not creating a bunch of tombstones in Cassandra, having it run quickly, etc)

I have a Cassandra table with two key columns and 300 potential descriptor values.

create table sample {
    key1   text,
    key2   text,
    0      text,
    ............
    299    text,
    PRIMARY KEY (key1, key2)
}

I have a Spark dataframe that matches the underlying table but each row in the dataframe is very sparse - other than the two key values, a particular row may have only 4 to 5 of the "descriptors" (columns 0->299) with a value.

I am currently converting the Spark dataframe to an RDD and using saveRdd to write the data.

This works, but "null" is stored in columns when there is no value.

For example:

  val saveRdd = sample.rdd

  saveRdd.map(line => (
    line(0), line(1), line(2),
    line(3), line(4), line(5),
    line(6), line(7), line(8),
    line(9), line(10), line(11),
    line(12), line(13), line(14),
    line(15), line(16), line(17),
    line(18), line(19), line(20))).saveToCassandra..........

Creates this in Cassandra:

XYZ | 10 | 49849 | F | | null | null | null | null | null | null | null | null | null | null | | null | null | null | null | null | null | null | null | null | null | TO11142017_Import | null | null | null | null | null | null | null | null | null | null | 20 | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | Scott Dick-Peddie | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | 7/13/2014 0:00 | null | null | null | null | null | null | null | null | null | null | 0 | null | null | null | null | null | null | null | null | null | null | | null | null | null | null | null | null | null | null | null | null | 8 | null | null | null | null | null | null | null | null | null | null | | null | null | null | null | null | null | null | null | null | null | LOCATIONS | null | null | null | null | null | null | null | null | null | null | LOCATIONS | null | null | null | null | null | null | null | null | null | null

Setting spark.cassandra.output.ignoreNulls on SparkSession does not work:

spark.conf.set("spark.cassandra.output.ignoreNulls", "true")
spark.conf.get("spark.cassandra.output.ignoreNulls")

This does not work either:

spark-shell  --conf spark.cassandra.output.ignoreNulls=true

(tried different ways to set this and it doesn't seem to work any way I set it)

withColumn and filter do not seem to be appropriate solutions. An unset concept might be the right thing, but not sure how to use that in this case.

cassandra.3.11.2

spark-cassandra-connector:2.3.0-s_2.11

spark 2.2.0.2.6.3.0-235

Thank you!

Upvotes: 2

Views: 830

Answers (1)

Alex Ott
Alex Ott

Reputation: 87164

Are you sure that ignoreNulls doesn't work for you? Cassandra outputs null when there is no value in given cell. You can check if the data is really written into SSTable using the sstabledump tool - you'll definitely see the cells with deletion information attached (that's how nulls are stored).

Here is example of running Spark without ignoreNulls (default), and with ignoreNulls is set to true. Testing was done on DSE 5.1.11, that has older version of connector, but matching to Cassandra 3.11.

Let create a test table like this:

create table test.t3 (id int primary key, t1 text, t2 text, t3 text);

without ignoreNulls - we need following code for testing:

case class T3(id: Int, t1: Option[String], t2: Option[String], t3: Option[String])
val rdd = sc.parallelize(Seq(new T3(1, None, Some("t2"), None)))
rdd.saveToCassandra("test", "t3")

If we look into data using cqlsh we will see following:

cqlsh:test> SELECT * from test.t3;

 id | t1   | t2 | t3
----+------+----+------
  1 | null | t2 | null

(1 rows)

After doing nodetool flush we can look into SSTables. That's what we'll see here:

>sstabledump mc-1-big-Data.db
[
  {
    "partition" : {
      "key" : [ "1" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 30,
        "liveness_info" : { "tstamp" : "2018-11-06T07:53:38.418171Z" },
        "cells" : [
          { "name" : "t1", "deletion_info" : { "local_delete_time" : "2018-11-06T07:53:38Z" }
          },
          { "name" : "t2", "value" : "t2" },
          { "name" : "t3", "deletion_info" : { "local_delete_time" : "2018-11-06T07:53:38Z" }
          }
        ]
      }
    ]
  }
]

You can see that for columns t1 & t3 that were nulls there is a field deletion_info.

Now, let remove data with TRUNCATE test.t3, and start spark-shell again with ignoreNulls set to true:

dse spark --conf spark.cassandra.output.ignoreNulls=true

After executing the same Spark code we'll see same results in the cqlsh:

cqlsh:test> SELECT * from test.t3;

 id | t1   | t2 | t3
----+------+----+------
  1 | null | t2 | null

But after performing flush, the sstabledump shows completely different picture:

>sstabledump mc-3-big-Data.db
[
  {
    "partition" : {
      "key" : [ "1" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 27,
        "liveness_info" : { "tstamp" : "2018-11-06T07:56:27.035600Z" },
        "cells" : [
          { "name" : "t2", "value" : "t2" }
        ]
      }
    ]
  }
]

As you see, we have only data for column t2, and no mentions of columns t3 & t1 that were nulls.

Upvotes: 2

Related Questions