Reputation: 351
Environment: Scala,spark,structured streaming,kafka
i have a DF coming from a kafka stream with the following schema
DF:
BATCH ID: 0
+-----------------------+-----+---------+------+
| value|topic|partition|offset|
+-----------------------+-----+---------+------+
|{"big and nested json"}| A | 0| 0|
|{"big and nested json"}| B | 0| 0|
+-----------------------+-----+---------+------+
i want to process each row in parallel by using spark, and i manage to split them to my executors using
DF.repartition(Number).foreach(row=> processRow(row))
i need to extract the value from the value column into its own dataframe to process it. Im having difficulties working with the Dataframe generic Row object..
is there a way to turn the single row in each executor to its very own Dataframe (using a fixed schema?) and write in a fixed location? Is there a better approach to solve my problem?
EDIT + Clarification:
The DF im receiving is coming as a Batch using a forEachBatch
function of the writeStream functionality that exists since spark2.4
Currently splitting the DF into ROWS makes it that the rows will be split equally into all my executors, i would like to turn a single GenericRow object into a DataFrame so i can process using a function i made
for example i would send the row to the function
processRow(row:row)
take the value and the topic and turn it back into a single-lined DF
+-----------------------+-----+
| value|topic|
+-----------------------+-----+
|{"big and nested json"}| A |
+-----------------------+-----+
for further processing
Upvotes: -1
Views: 1856
Reputation: 10362
I guess you are consuming multiple kafka data at a time.
First you need to prepare schema
for all kafka topics, here for example I have used two different JSON in value column.
scala> val df = Seq(("""{"name":"Srinivas"}""","A"),("""{"age":20}""","B")).toDF("value","topic")
scala> df.show(false)
+-------------------+-----+
|value |topic|
+-------------------+-----+
|{"name":"Srinivas"}|A |
|{"age":20} |B |
+-------------------+-----+
scala> import org.apache.spark.sql.types._
Schema for topic A
scala> val topicASchema = DataType.fromJson("""{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}}]}""").asInstanceOf[StructType]
Schema for topic B
scala> val topicBSchema = DataType.fromJson("""{"type":"struct","fields":[{"name":"age","type":"long","nullable":true,"metadata":{}}]}""").asInstanceOf[StructType]
Combining topic & its schema.
scala> val topicSchema = Seq(("A",topicASchema),("B",topicBSchema)) // Adding Topic & Its Schema.
Processing DataFrame
scala> topicSchema
.par
.map(d => df.filter($"topic" === d._1).withColumn("value",from_json($"value",d._2)))
.foreach(_.show(false)) // Using .par & filtering dataframe based on topic & then applying schema to value column.
+----------+-----+
|value |topic|
+----------+-----+
|[Srinivas]|A |
+----------+-----+
+-----+-----+
|value|topic|
+-----+-----+
|[20] |B |
+-----+-----+
Writing to hdfs
scala> topicSchema
.par
.map(d => df.filter($"topic" === d._1).withColumn("value",from_json($"value",d._2)).write.format("json").save(s"/tmp/kafka_data/${d._1}"))
Final Data stored in hdfs
scala> import sys.process._
import sys.process._
scala> "tree /tmp/kafka_data".!
/tmp/kafka_data
├── A
│ ├── part-00000-1e854106-49de-44b3-ab18-6c98a126c8ca-c000.json
│ └── _SUCCESS
└── B
├── part-00000-1bd51ad7-cfb6-4187-a374-4e2d4ce9cc50-c000.json
└── _SUCCESS
2 directories, 4 files
Upvotes: 1
Reputation: 774
In this case it's better suited to use .map
instead of .foreach
. The reason is that the map
returns a new dataset while foreach
just a function and doesn't return anything.
One other thing that can help you is to parse the schema located in JSON.
I had a similar requirement recently.
My JSON object has a "similar" schema for both topic A
and B
. If that is not the case for you, you might need to create multiple dataframes
in the solution below by grouping them by topic.
val sanitiseJson: String => String = value => value
.replace("\\\"", "\"")
.replace("\\\\", "\\")
.replace("\n", "")
.replace("\"{", "{")
.replace("}\"", "}")
val parsed = df.toJSON
.map(sanitiseJson)
This will give you something like:
{
"value": { ... },
"topic": "A"
}
Then you can pass that into a new read
function:
var dfWithSchema = spark.read.json(parsed)
At this point you would access the value in the nested JSON:
dfWithSchema.select($"value.propertyInJson")
There are some optimizations you can do when it comes to sanitiseJson
if needed.
Upvotes: 0