Reputation: 1655
I am trying a very simple sort operation on spark structured streaming dataframe but ending up "Exception in thread "main" org.apache.spark.sql.AnalysisException: Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode" with below exception. Can you please help me on this.
Code:
val df: DataFrame = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokerList)
.option("kafka.security.protocol", security)
.option("startingOffsets", "latest")
.option("subscribe", srcTopic)
.option("group.id", groupID)
.option("failOnDataLoss", false)
.load
val uDF = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
.select($"value")
.select(from_json($"value", uSchema).as("events"))
.select($"events.*")
val uDF2 = uDF
.select($"COL1", $"COL2", $"COL3", $"COL4", $"COL5", $"COL6", $"COL7", $"COL8")
.sort($"COL5",$"COL3",$"COL8")
val kDF = uDF2
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("kafka.security.protocol", "PLAINTEXT")
.option("topic", "r_topic")
.option("checkpointLocation", "/tmp/kafka-sink-checkpoint")
.start()
kDF.awaitTermination()
Exception:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;;
DATA:
Want to sort the DF by "COL5","COL3","COL8"
+------------+--------------------------------------+-------------+-----+-----------+-------------+----------+----------+
|COL1 |COL2 |COL3 |COL4 |COL5 |COl6 |COL7 |COl8 |
+------------+--------------------------------------+-------------+-----+-----------+-------------+----------+----------+
|RunKafkaTest|DUMMY VALUE |1528326884394|52.0 |Analog |0 |1528326880|67 |
|RunKafkaTest|DUMMY VALUE |1528326884388|53.0 |Analog |0 |1528326880|68 |
|RunKafkaTest|DUMMY VALUE |1528326886400|54.0 |Analog |0 |1528326880|69 |
|RunKafkaTest|DUMMY VALUE |1528326887412|55.0 |Analog |0 |1528326880|70 |
|RunKafkaTest|DUMMY VALUE |1528326887406|56.0 |Analog |0 |1528326880|71 |
|RunKafkaTest|DUMMY VALUE |1528326889418|57.0 |Analog |0 |1528326880|72 |
|RunKafkaTest|DUMMY VALUE |1528326890423|58.0 |Analog |0 |1528326880|73 |
|RunKafkaTest|DUMMY VALUE |1528326891429|59.0 |Analog |0 |1528326880|74 |
|RunKafkaTest|DUMMY VALUE |1528326892435|1.0 |Analog |0 |1528326880|76 |
|RunKafkaTest|DUMMY VALUE |1528326893449|2.0 |Analog |0 |1528326880|77 |
|RunKafkaTest|DUMMY VALUE |1528326894447|3.0 |Analog |0 |1528326880|78 |
|RunKafkaTest|DUMMY VALUE |1528326895459|4.0 |Analog |0 |1528326880|79 |
|RunKafkaTest|DUMMY VALUE |1528326896458|5.0 |Analog |0 |1528326880|80 |
|RunKafkaTest|DUMMY VALUE |1528326897464|6.0 |Analog |0 |1528326880|81 |
|RunKafkaTest|DUMMY VALUE |1528326898370|7.0 |Analog |0 |1528326880|82 |
|RunKafkaTest|DUMMY VALUE |1528326899476|8.0 |Analog |0 |1528326880|83 |
|RunKafkaTest|DUMMY VALUE |1528326900482|9.0 |Analog |0 |1528326880|84 |
|RunKafkaTest|DUMMY VALUE |1528326901488|10.0 |Analog |0 |1528326880|85 |
|RunKafkaTest|DUMMY VALUE |1528326902493|11.0 |Analog |0 |1528326880|86 |
+------------+--------------------------------------+-------------+-----+-----------+-------------+----------+----------+
Upvotes: 5
Views: 6271
Reputation: 44
you need to group by action before sort(order by) like:
uDF.select($"COL1", $"COL2", $"COL3", $"COL4", $"COL5", $"COL6", $"COL7", $"COL8")
.groupBy("COL1")
.agg(max("COL2").as("COL2")......).sort("........")
Upvotes: 2
Reputation: 1708
You may want to rethink which will be the output of sort in stream. In real streaming, you never get the output since you're unlikely to encounter the last event in stream theoretically. While Spark actually does micro-batch, it tries to keep the semantic similar as real streaming. You may end up with redefine your problem, and leverage stateful operations like windowing, or flatMapGroupsWithState. You might also be able to split range manually and run batch.
Upvotes: 1