Reputation: 2121
I want to stream a specific date format into a Cassandra datetime
column.
My incoming date format is in the following date format:
"%Y-%m-%dT%H:%M:%S.%f"
e.g. "2021-05-18T11:12:13.123456"
My Cassandra table is:
CREATE TABLE table_name (
id text,
timestamp timestamp,
PRIMARY KEY (id)
)
The ingestion of my Spark job is the following:
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkahost)
.option("subscribe", kafkatopic)
.option("startingOffsets", "latest")
.load()
val jsonSchema = StructType(
List(
StructField("id", StringType, true),
StructField("timestamp", StringType, true)
)
)
val cassandraDF = df
.withColumn("value", col("value").cast(StringType)),
.withColumn("json", from_json(col("value"), jsonSchema))
.select("json.*")
At this point, cassandraDF
has two columns id
and timestamp
, both are StringType
.
Then, writing to Cassandra is also quite simple:
cassandraDF
.writeStream
.outputMode(OutputMode.Append)
.format("org.apache.spark.sql.cassandra")
.option("keyspace", cassandra_keyspace)
.option("table", cassandra_table)
.option("checkpointLocation", "/tmp/checkpoints/cassandra")
.start()
.awaitTermination()
When I stream data, I get the following ERROR
:
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.IllegalArgumentException: Invalid date: 2021-05-18T11:12:13.123456
So I tried streaming other date formats. For example one without the microseconds:
"%Y-%m-%dT%H:%M:%S"
e.g. "2021-05-18T11:12:13"
and that streams without any problem. This date date format gets accepted.
My question is:
Upvotes: 1
Views: 427
Reputation: 87154
Cassandra supports only millisecond resolution for timestamp type. By default it's not allowed to write string into timestamp field, but Spark Connector having implicit transformations like this. And if you look into this source code, you'll see that it supports only parsing from timestamp with milliseconds.
So the solution would be to convert your timestamp
column from string
to Spark timestamp
(only on Spark 3.x where support for microseconds resolution was added):
val cassandraDF = df
.withColumn("value", col("value").cast(StringType)),
.withColumn("json", from_json(col("value"), jsonSchema))
.select("json.*")
.withColumn("timestamp"), $"timestamp".cast("timestamp")
But anyway it will be stored in Cassandra with milliseconds only.
Upvotes: 1