coding_potato
coding_potato

Reputation: 351

Operating in parallel on a Spark Dataframe Rows

Environment: Scala,spark,structured streaming,kafka

i have a DF coming from a kafka stream with the following schema

DF:

BATCH ID: 0
+-----------------------+-----+---------+------+
|                  value|topic|partition|offset|
+-----------------------+-----+---------+------+
|{"big and nested json"}|  A  |        0|     0|
|{"big and nested json"}|  B  |        0|     0|
+-----------------------+-----+---------+------+

i want to process each row in parallel by using spark, and i manage to split them to my executors using

DF.repartition(Number).foreach(row=> processRow(row))

i need to extract the value from the value column into its own dataframe to process it. Im having difficulties working with the Dataframe generic Row object..

is there a way to turn the single row in each executor to its very own Dataframe (using a fixed schema?) and write in a fixed location? Is there a better approach to solve my problem?

EDIT + Clarification:

The DF im receiving is coming as a Batch using a forEachBatch function of the writeStream functionality that exists since spark2.4

Currently splitting the DF into ROWS makes it that the rows will be split equally into all my executors, i would like to turn a single GenericRow object into a DataFrame so i can process using a function i made

for example i would send the row to the function

processRow(row:row)

take the value and the topic and turn it back into a single-lined DF

+-----------------------+-----+
|                  value|topic|
+-----------------------+-----+
|{"big and nested json"}|  A  |
+-----------------------+-----+

for further processing

Upvotes: -1

Views: 1856

Answers (2)

s.polam
s.polam

Reputation: 10362

I guess you are consuming multiple kafka data at a time.

First you need to prepare schema for all kafka topics, here for example I have used two different JSON in value column.

scala> val df = Seq(("""{"name":"Srinivas"}""","A"),("""{"age":20}""","B")).toDF("value","topic")
scala> df.show(false)
+-------------------+-----+
|value              |topic|
+-------------------+-----+
|{"name":"Srinivas"}|A    |
|{"age":20}         |B    |
+-------------------+-----+
scala> import org.apache.spark.sql.types._

Schema for topic A

scala> val topicASchema = DataType.fromJson("""{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}}]}""").asInstanceOf[StructType]

Schema for topic B

scala> val topicBSchema = DataType.fromJson("""{"type":"struct","fields":[{"name":"age","type":"long","nullable":true,"metadata":{}}]}""").asInstanceOf[StructType]

Combining topic & its schema.

scala> val topicSchema = Seq(("A",topicASchema),("B",topicBSchema)) // Adding Topic & Its Schema.

Processing DataFrame

scala> topicSchema
.par
.map(d => df.filter($"topic" === d._1).withColumn("value",from_json($"value",d._2)))
.foreach(_.show(false)) // Using .par & filtering dataframe based on topic & then applying schema to value column.
+----------+-----+
|value     |topic|
+----------+-----+
|[Srinivas]|A    |
+----------+-----+

+-----+-----+
|value|topic|
+-----+-----+
|[20] |B    |
+-----+-----+

Writing to hdfs

scala> topicSchema
.par
.map(d => df.filter($"topic" === d._1).withColumn("value",from_json($"value",d._2)).write.format("json").save(s"/tmp/kafka_data/${d._1}"))

Final Data stored in hdfs

scala> import sys.process._
import sys.process._

scala> "tree /tmp/kafka_data".!
/tmp/kafka_data
├── A
│   ├── part-00000-1e854106-49de-44b3-ab18-6c98a126c8ca-c000.json
│   └── _SUCCESS
└── B
    ├── part-00000-1bd51ad7-cfb6-4187-a374-4e2d4ce9cc50-c000.json
    └── _SUCCESS

2 directories, 4 files

Upvotes: 1

Hedrack
Hedrack

Reputation: 774

In this case it's better suited to use .map instead of .foreach. The reason is that the map returns a new dataset while foreach just a function and doesn't return anything.

One other thing that can help you is to parse the schema located in JSON.

I had a similar requirement recently. My JSON object has a "similar" schema for both topic A and B. If that is not the case for you, you might need to create multiple dataframes in the solution below by grouping them by topic.

val sanitiseJson: String => String = value => value
  .replace("\\\"", "\"")
  .replace("\\\\", "\\")
  .replace("\n", "")
  .replace("\"{", "{")
  .replace("}\"", "}")

val parsed = df.toJSON
  .map(sanitiseJson)

This will give you something like:

{
    "value": { ... },
    "topic": "A"
}

Then you can pass that into a new read function:

var dfWithSchema = spark.read.json(parsed)

At this point you would access the value in the nested JSON:

dfWithSchema.select($"value.propertyInJson")

There are some optimizations you can do when it comes to sanitiseJson if needed.

Upvotes: 0

Related Questions