Reputation: 112
I have an entity which resembles to a cassandra table. I am using spark to save/update the data into Cassandra. Here the entity is Offer case class
case class Offer(offer_id: String, metadata_last_modified_source_time: Timestamp, product_type: String, writeTime: util.Date)
val offerDataset: Dataset[Offer] = ....
I am saving this data as below
offerDataset.write.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> cassandraKeyspace, "table" -> tableName))
.mode(SaveMode.Append)
.save()
The cassandra table's schema is
OFFER(offer_id, metadata_last_modified_source_time, product_type)
The problem is to configure writeTime field of Offer entity as write timestamp while saving/updating the cassandra table. It is mentioned here in datastax - https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md to configure like
writetime=columnName
What I couldn't understand is how should the syntax look like.
Any help could be greatly appreciated
Upvotes: 1
Views: 767
Reputation: 87154
This documentation is for alpha version of the Spark Cassandra Connector, so please expect something not working. As pointed in documentation - this is a table option, so you can set it via options
. You only need to switch from util.Date
to Timestamp
or Long
types - Spark SQL doesn't support encoding from Date
type.
with following definitions everything works:
import java.time.Instant
import java.sql.Timestamp
case class Offer(offer_id: String, metadata_last_modified_source_time: Timestamp,
product_type: String, writeTime: Long)
val offerDataset = Seq(Offer("123", Timestamp.from(Instant.now()), "test", 1243124234L),
Offer("456", Timestamp.from(Instant.now()), "test", 12431242366L)).toDF
or with Timestamp
:
case class Offer(offer_id: String, metadata_last_modified_source_time: Timestamp,
product_type: String, writeTime: Timestamp)
val offerDataset = Seq(Offer("123", Timestamp.from(Instant.now()), "test", new Timestamp(1243124234L)),
Offer("456", Timestamp.from(Instant.now()), "test", new Timestamp(12431242366L))).toDF
if we use following table structure:
create table test.wrt_test (
offer_id text,
metadata_last_modified_source_time timestamp,
product_type text,
primary key(offer_id, metadata_last_modified_source_time));
then you can save data as following (only in 3.0-alpha!):
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.cassandra._
offerDataset.write.cassandraFormat("wrt_test", "test")
.option("writetime", "writeTime") // here you specify name of the column with time!
.mode(SaveMode.Append).save()
but it also works just fine in the current release version, if you use RDD API:
import com.datastax.spark.connector.writer._
offerDataset.rdd.saveToCassandra("test", "wrt_test",
writeConf = WriteConf(timestamp = TimestampOption.perRow("writeTime")))
After writing in both cases you get following:
cqlsh> select offer_id, metadata_last_modified_source_time, product_type, writetime(product_type) from test.wrt_test;
offer_id | metadata_last_modified_source_time | product_type | writetime(product_type)
----------+------------------------------------+--------------+-------------------------
123 | 2020-04-16 07:28:38.905000+0000 | test | 1243124234
456 | 2020-04-16 07:28:38.905000+0000 | test | 12431242366
(2 rows)
Upvotes: 1