Bruno Lubascher
Bruno Lubascher

Reputation: 2121

How to write datetime with microseconds to Cassandra with Spark?

I want to stream a specific date format into a Cassandra datetime column.

My incoming date format is in the following date format:

"%Y-%m-%dT%H:%M:%S.%f"

e.g. "2021-05-18T11:12:13.123456"

My Cassandra table is:

CREATE TABLE table_name (

  id text,
  timestamp timestamp,
  PRIMARY KEY (id)

)

The ingestion of my Spark job is the following:

val df = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", kafkahost)
    .option("subscribe", kafkatopic)
    .option("startingOffsets", "latest")
    .load()

val jsonSchema = StructType(
      List(
        StructField("id", StringType, true),
        StructField("timestamp", StringType, true)
      )
    )

val cassandraDF = df
    .withColumn("value", col("value").cast(StringType)),
    .withColumn("json", from_json(col("value"), jsonSchema))
    .select("json.*")

At this point, cassandraDF has two columns id and timestamp, both are StringType.

Then, writing to Cassandra is also quite simple:

cassandraDF
    .writeStream
    .outputMode(OutputMode.Append)
    .format("org.apache.spark.sql.cassandra")
    .option("keyspace", cassandra_keyspace)
    .option("table", cassandra_table)
    .option("checkpointLocation", "/tmp/checkpoints/cassandra")
    .start()
    .awaitTermination()

When I stream data, I get the following ERROR:

at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)

at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.IllegalArgumentException: Invalid date: 2021-05-18T11:12:13.123456

So I tried streaming other date formats. For example one without the microseconds:

"%Y-%m-%dT%H:%M:%S"

e.g. "2021-05-18T11:12:13"

and that streams without any problem. This date date format gets accepted.

My question is:

Upvotes: 1

Views: 427

Answers (1)

Alex Ott
Alex Ott

Reputation: 87154

Cassandra supports only millisecond resolution for timestamp type. By default it's not allowed to write string into timestamp field, but Spark Connector having implicit transformations like this. And if you look into this source code, you'll see that it supports only parsing from timestamp with milliseconds.

So the solution would be to convert your timestamp column from string to Spark timestamp (only on Spark 3.x where support for microseconds resolution was added):

val cassandraDF = df
    .withColumn("value", col("value").cast(StringType)),
    .withColumn("json", from_json(col("value"), jsonSchema))
    .select("json.*")
    .withColumn("timestamp"), $"timestamp".cast("timestamp")

But anyway it will be stored in Cassandra with milliseconds only.

Upvotes: 1

Related Questions