nilesh1212
nilesh1212

Reputation: 1655

Sort operation on Spark Structured Streaming DataFrame

I am trying a very simple sort operation on spark structured streaming dataframe but ending up "Exception in thread "main" org.apache.spark.sql.AnalysisException: Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode" with below exception. Can you please help me on this.

Code:

   val df: DataFrame = spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", kafkaBrokerList)
        .option("kafka.security.protocol", security)
        .option("startingOffsets", "latest")
        .option("subscribe", srcTopic)
        .option("group.id", groupID)
        .option("failOnDataLoss", false)        
        .load

      val uDF = df
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
        .as[(String, String)]
        .select($"value")
        .select(from_json($"value", uSchema).as("events"))
        .select($"events.*")

     val uDF2 = uDF
        .select($"COL1", $"COL2", $"COL3", $"COL4", $"COL5", $"COL6", $"COL7", $"COL8")
        .sort($"COL5",$"COL3",$"COL8")


    val kDF = uDF2
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("kafka.security.protocol", "PLAINTEXT")
        .option("topic", "r_topic")
        .option("checkpointLocation", "/tmp/kafka-sink-checkpoint")
        .start()


    kDF.awaitTermination()

Exception:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;;

DATA:

Want to sort the DF by "COL5","COL3","COL8"

+------------+--------------------------------------+-------------+-----+-----------+-------------+----------+----------+
|COL1        |COL2                                  |COL3         |COL4 |COL5       |COl6         |COL7      |COl8      |
+------------+--------------------------------------+-------------+-----+-----------+-------------+----------+----------+
|RunKafkaTest|DUMMY VALUE                           |1528326884394|52.0 |Analog     |0            |1528326880|67        |
|RunKafkaTest|DUMMY VALUE                           |1528326884388|53.0 |Analog     |0            |1528326880|68        |
|RunKafkaTest|DUMMY VALUE                           |1528326886400|54.0 |Analog     |0            |1528326880|69        |
|RunKafkaTest|DUMMY VALUE                           |1528326887412|55.0 |Analog     |0            |1528326880|70        |
|RunKafkaTest|DUMMY VALUE                           |1528326887406|56.0 |Analog     |0            |1528326880|71        |
|RunKafkaTest|DUMMY VALUE                           |1528326889418|57.0 |Analog     |0            |1528326880|72        |
|RunKafkaTest|DUMMY VALUE                           |1528326890423|58.0 |Analog     |0            |1528326880|73        |
|RunKafkaTest|DUMMY VALUE                           |1528326891429|59.0 |Analog     |0            |1528326880|74        |
|RunKafkaTest|DUMMY VALUE                           |1528326892435|1.0  |Analog     |0            |1528326880|76        |
|RunKafkaTest|DUMMY VALUE                           |1528326893449|2.0  |Analog     |0            |1528326880|77        |
|RunKafkaTest|DUMMY VALUE                           |1528326894447|3.0  |Analog     |0            |1528326880|78        |
|RunKafkaTest|DUMMY VALUE                           |1528326895459|4.0  |Analog     |0            |1528326880|79        |
|RunKafkaTest|DUMMY VALUE                           |1528326896458|5.0  |Analog     |0            |1528326880|80        |
|RunKafkaTest|DUMMY VALUE                           |1528326897464|6.0  |Analog     |0            |1528326880|81        |
|RunKafkaTest|DUMMY VALUE                           |1528326898370|7.0  |Analog     |0            |1528326880|82        |
|RunKafkaTest|DUMMY VALUE                           |1528326899476|8.0  |Analog     |0            |1528326880|83        |
|RunKafkaTest|DUMMY VALUE                           |1528326900482|9.0  |Analog     |0            |1528326880|84        |
|RunKafkaTest|DUMMY VALUE                           |1528326901488|10.0 |Analog     |0            |1528326880|85        |
|RunKafkaTest|DUMMY VALUE                           |1528326902493|11.0 |Analog     |0            |1528326880|86        |
+------------+--------------------------------------+-------------+-----+-----------+-------------+----------+----------+

Upvotes: 5

Views: 6271

Answers (2)

user7460598
user7460598

Reputation: 44

you need to group by action before sort(order by) like:

uDF.select($"COL1", $"COL2", $"COL3", $"COL4", $"COL5", $"COL6", $"COL7", $"COL8")
   .groupBy("COL1")
   .agg(max("COL2").as("COL2")......).sort("........")

Upvotes: 2

Jungtaek Lim
Jungtaek Lim

Reputation: 1708

You may want to rethink which will be the output of sort in stream. In real streaming, you never get the output since you're unlikely to encounter the last event in stream theoretically. While Spark actually does micro-batch, it tries to keep the semantic similar as real streaming. You may end up with redefine your problem, and leverage stateful operations like windowing, or flatMapGroupsWithState. You might also be able to split range manually and run batch.

Upvotes: 1

Related Questions