Achaius
Achaius

Reputation: 6124

How to perform dynamic partition based on row count in dataFram for a column value

I am trying to partition a input files based on accountId But this partition has be done only if dataFrames contains more than 1000 records. The accountId is a dynamic integer that could not be uknown. Consider the following code below

    val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.textFileStream("input")
lines.print()

lines.foreachRDD { rdd =>
  val count = rdd.count()
  if (count > 0) {
    val df = sqlContext.read.json(rdd)
    val filteredDF = df.filter(df("accountId")==="3")
    if (filteredDF.count() > 1000) {
      df.write.partitionBy("accountId").format("json").save("output")
    }
  }
}

ssc.start()
ssc.awaitTermination()

But the above code partitions all accountId which is not needed.

  1. I want to find the count for each accountId in the dataframe.
  2. If records for each accountId exceeds 1000, then write the partitioned information into output source.

For example, If the input file has 1500 records for accountId=1 and 10 records for accountId=2, then partition filtered dataframe based on accountId=1 into output source and keep accountId=2 records in memmory.

How to achieve this using spark-streaming?

Upvotes: 0

Views: 762

Answers (1)

khrist safalhai
khrist safalhai

Reputation: 570

Should'd you be doing

filteredDF.write.partitionBy("accountId").format("json").save("output")

instead of

df.write.partitionBy("accountId").format("json").save("output")

Upvotes: 1

Related Questions