Reputation: 112
I have a spark dataset of an entity which should get saved/updated into cassandra table named 'offer'.
case class Offer(offer_id: String, metadata_last_modified_source_time: java.sql.Timestamp, product_type: String)
val offerDataset: Dataset[Offer] = ....
I want to save or update the above 'offerDataset' to cassandra with the write timestamp to be decided by the field "metadata_last_modified_source_time" of 'offer' entity.
offerDataset.rdd.saveToCassandra("cassandra_keyspace", "cassandra_table", writeConf = WriteConf(timestamp = TimestampOption.perRow("metadata_last_modified_source_time")))
While writing to Cassandra, am facing the below exception. Can somebody help me to understand this issue. Got the same error with util.Date and Long types for 'metadata_last_modified_source_time'.
com.datastax.driver.core.exceptions.InvalidTypeException: Value metadata_last_modified_source_time is of type bigint, not timestamp
at com.datastax.driver.core.AbstractGettableByIndexData.checkType(AbstractGettableByIndexData.java:83)
at com.datastax.driver.core.AbstractData.set(AbstractData.java:529)
at com.datastax.driver.core.AbstractData.set(AbstractData.java:536)
at com.datastax.driver.core.BoundStatement.set(BoundStatement.java:870)
at com.datastax.spark.connector.writer.BoundStatementBuilder.com$datastax$spark$connector$writer$BoundStatementBuilder$$bindColumnNull(BoundStatementBuilder.scala:59)
at com.datastax.spark.connector.writer.BoundStatementBuilder$$anonfun$5.apply(BoundStatementBuilder.scala:83)
Upvotes: 0
Views: 299
Reputation: 112
I found solution after going through this doc - https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md
Introduced a new field writeTime in the Offer case class which should map to cassandra table's write timestamp
case class Offer(offer_id: String, metadata_last_modified_source_time: java.sql.Timestamp, product_type: String, writeTime: sql.Date)
While building the offerDataSet, I set the writeTime field's value to be
val offerDataset: Dataset[Offer] = {....
....
val writeTime = new Date(metadata_last_modified_source_time.getTime())
....
....
}
offerDataset.rdd.saveToCassandra("cassandra_keyspace", "cassandra_table", writeConf = WriteConf(timestamp = TimestampOption.perRow("writeTime")))
Upvotes: 1