malgosia
malgosia

Reputation: 11

Watermarking in Spark Structured Streaming 2.3.0

I read data from Kafka in Spark Structured Streaming 2.3.0. The data contains information about some teachers, there is teacherId, teacherName and teacherGroupsIds. TeacherGroupsIds is an array column which contains ids of the group. In my task I have to map the column with group ids to column containing information about group names([1,2,3] => [Suns,Books,Flowers]). The names and ids are stored in HBase and can change everyday. Later I have to send the data to another Kafka topic.

So, I read data from two sources - Kafka and HBase. I read data from HBase using shc library.

First, I explode the array column (group ids), later I join with the data from HBase.

In next step I would like to aggregate the data using teacherId. But this operation is not supported in Append Mode which I use.

I have tried with watermarking, but at the moment it doesn't work. I added a new column with timestamp and I would group by this column.

Dataset<Row> inputDataset = //reading from Kafka

Dataset<Row> explodedDataset = // explode function applied and join with HBase

Dataset<Row> outputDataset = explodedDataset
.withColumn("eventTime", lit(current_timestamp()))
.withWatermark("eventTime", "2 minutes")
.groupBy(window(col("eventTime"), "5 seconds"), col("teacherId"))
.agg(collect_list(col("groupname")));

Actual results show empty dataframe at the output. There is not any row.

Upvotes: 1

Views: 745

Answers (1)

The problem is current_timestamp().

current_timestamp returns the timestamp in that moment, so, if you create a dataframe with this column and print the result, you print the current timestamp, but if you process the df and you print the same column, you print the new timestamp.

This solution works locally, but sometimes in a distributed system it fails because the workers when receiving the order to print the data, this data is already outside the timestamp range.

Upvotes: 0

Related Questions