Nasreen
Nasreen

Reputation: 112

Spark: How to configure writetime while saving to cassandra

I have an entity which resembles to a cassandra table. I am using spark to save/update the data into Cassandra. Here the entity is Offer case class

case class Offer(offer_id: String, metadata_last_modified_source_time: Timestamp, product_type: String, writeTime: util.Date)

val offerDataset: Dataset[Offer] = ....

I am saving this data as below

offerDataset.write.format("org.apache.spark.sql.cassandra")
      .options(Map("keyspace" -> cassandraKeyspace, "table" -> tableName))
      .mode(SaveMode.Append)
      .save()

The cassandra table's schema is

OFFER(offer_id, metadata_last_modified_source_time, product_type) 

The problem is to configure writeTime field of Offer entity as write timestamp while saving/updating the cassandra table. It is mentioned here in datastax - https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md to configure like

writetime=columnName

enter image description here

What I couldn't understand is how should the syntax look like.

Any help could be greatly appreciated

Upvotes: 1

Views: 767

Answers (1)

Alex Ott
Alex Ott

Reputation: 87154

This documentation is for alpha version of the Spark Cassandra Connector, so please expect something not working. As pointed in documentation - this is a table option, so you can set it via options. You only need to switch from util.Date to Timestamp or Long types - Spark SQL doesn't support encoding from Date type.

with following definitions everything works:

import java.time.Instant
import java.sql.Timestamp

case class Offer(offer_id: String, metadata_last_modified_source_time: Timestamp, 
  product_type: String, writeTime: Long)

val offerDataset = Seq(Offer("123", Timestamp.from(Instant.now()), "test", 1243124234L),
  Offer("456", Timestamp.from(Instant.now()), "test", 12431242366L)).toDF

or with Timestamp:

case class Offer(offer_id: String, metadata_last_modified_source_time: Timestamp, 
   product_type: String, writeTime: Timestamp)

val offerDataset = Seq(Offer("123", Timestamp.from(Instant.now()), "test", new Timestamp(1243124234L)),
  Offer("456", Timestamp.from(Instant.now()), "test", new Timestamp(12431242366L))).toDF

if we use following table structure:

create table test.wrt_test (
  offer_id text,
  metadata_last_modified_source_time timestamp,
  product_type text,
  primary key(offer_id, metadata_last_modified_source_time));

then you can save data as following (only in 3.0-alpha!):

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.cassandra._
offerDataset.write.cassandraFormat("wrt_test", "test")
    .option("writetime", "writeTime") // here you specify name of the column with time!
    .mode(SaveMode.Append).save()

but it also works just fine in the current release version, if you use RDD API:

import com.datastax.spark.connector.writer._
offerDataset.rdd.saveToCassandra("test", "wrt_test", 
   writeConf = WriteConf(timestamp = TimestampOption.perRow("writeTime")))

After writing in both cases you get following:

cqlsh> select offer_id, metadata_last_modified_source_time, product_type, writetime(product_type) from test.wrt_test;
offer_id | metadata_last_modified_source_time | product_type | writetime(product_type)
----------+------------------------------------+--------------+-------------------------
      123 |    2020-04-16 07:28:38.905000+0000 |         test |              1243124234
      456 |    2020-04-16 07:28:38.905000+0000 |         test |             12431242366
(2 rows)

Upvotes: 1

Related Questions