Reputation: 6124
I am trying to partition a input files based on accountId
But this partition has be done only if dataFrames contains more than 1000 records. The accountId
is a dynamic integer that could not be uknown. Consider the following code below
val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.textFileStream("input")
lines.print()
lines.foreachRDD { rdd =>
val count = rdd.count()
if (count > 0) {
val df = sqlContext.read.json(rdd)
val filteredDF = df.filter(df("accountId")==="3")
if (filteredDF.count() > 1000) {
df.write.partitionBy("accountId").format("json").save("output")
}
}
}
ssc.start()
ssc.awaitTermination()
But the above code partitions all accountId which is not needed.
accountId
in the dataframe.For example, If the input file has 1500 records for accountId=1 and 10 records for accountId=2, then partition filtered dataframe based on accountId=1 into output source and keep accountId=2 records in memmory.
How to achieve this using spark-streaming?
Upvotes: 0
Views: 762
Reputation: 570
Should'd you be doing
filteredDF.write.partitionBy("accountId").format("json").save("output")
instead of
df.write.partitionBy("accountId").format("json").save("output")
Upvotes: 1