Victoriia
Victoriia

Reputation: 119

Spark Structured streaming - reading timestamp from file using schema

I am working on a Structured Streaming job.

The data I am reading from files contains the timestamp (in millis), deviceId and a value reported by that device. Multiple devices report data.

I am trying to write a job that aggregates (sums) values sent by all devices into tumbling windows of 1 minute.

The issue that I am having is with timestamp.

When I am trying to parse "timestamp" into Long, window function complains that it expects "timestamp type". When I am trying to parse into TimestampType as in the snippet below I am getting .MatchError exception (the full exception can be seen below) and I am struggling to figure out why and what is the correct way to handle it

// Create schema
StructType readSchema = new StructType().add("value" , "integer")
                                        .add("deviceId", "long")
                                        .add("timestamp", new TimestampType());

// Read data from file
Dataset<Row> inputDataFrame = sparkSession.readStream()
                                          .schema(readSchema)
                                          .parquet(path);

Dataset<Row> aggregations = inputDataFrame.groupBy(window(inputDataFrame.col("timestamp"), "1 minutes"),
                                                  inputDataFrame.col("deviceId"))
                                          .agg(sum("value"));

The exception:

 org.apache.spark.sql.types.TimestampType@3eeac696 (of class org.apache.spark.sql.types.TimestampType)
scala.MatchError: org.apache.spark.sql.types.TimestampType@3eeac696 (of class org.apache.spark.sql.types.TimestampType)
    at org.apache.spark.sql.catalyst.encoders.RowEncoder$.externalDataTypeFor(RowEncoder.scala:215)
    at org.apache.spark.sql.catalyst.encoders.RowEncoder$.externalDataTypeForInput(RowEncoder.scala:212)
    at org.apache.spark.sql.catalyst.expressions.objects.ValidateExternalType.<init>(objects.scala:1692)
    at org.apache.spark.sql.catalyst.encoders.RowEncoder$.$anonfun$serializerFor$3(RowEncoder.scala:175)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
    at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
    at org.apache.spark.sql.catalyst.encoders.RowEncoder$.serializerFor(RowEncoder.scala:171)
    at org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(RowEncoder.scala:66)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:92)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:232)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:242)
    at org.apache.spark.sql.streaming.DataStreamReader.parquet(DataStreamReader.scala:450)

Upvotes: 0

Views: 673

Answers (1)

Michael Heil
Michael Heil

Reputation: 18475

Typically, when your timestamp is stored in milis as a long you would convert it into a timestamp type as shown below:

// Create schema and keep column 'timestamp' as long
StructType readSchema = new StructType()
    .add("value", "integer")
    .add("deviceId", "long")
    .add("timestamp", "long");

// Read data from file
Dataset<Row> inputDataFrame = sparkSession.readStream()
                                          .schema(readSchema)
                                          .parquet(path);

// convert timestamp column into a proper timestamp type
Dataset<Row> df1 = inputDataFrame.withColumn("new_timestamp", expr("timestamp/1000").cast(DataTypes.TimestampType));

df1.show(false)

+-----+--------+-------------+-----------------------+
|value|deviceId|timestamp    |new_timestamp          |
+-----+--------+-------------+-----------------------+
|1    |1337    |1618836775397|2021-04-19 14:52:55.397|
+-----+--------+-------------+-----------------------+

df1.printSchema();

root
 |-- value: integer (nullable = true)
 |-- deviceId: long (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- new_timestamp: timestamp (nullable = true)

Upvotes: 1

Related Questions