K P
K P

Reputation: 851

How to define schema of streaming dataset dynamically to write to csv?

I have a streaming dataset, reading from kafka and trying to write to CSV

case class Event(map: Map[String,String])
def decodeEvent(arrByte: Array[Byte]): Event = ...//some implementation
val eventDataset: Dataset[Event] = spark
  .readStream
  .format("kafka")
  .load()
  .select("value")
  .as[Array[Byte]]
  .map(decodeEvent)

Event holds Map[String,String] inside and to write to CSV I'll need some schema.

Let's say all the fields are of type String and so I tried the example from spark repo

val columns = List("year","month","date","topic","field1","field2")
val schema = new StructType() //Prepare schema programmatically
columns.foreach { field => schema.add(field, "string") }
val rowRdd = eventDataset.rdd.map { event => Row.fromSeq(
     columns.map(c => event.getOrElse(c, "")
)}
val df = spark.sqlContext.createDataFrame(rowRdd, schema)

This gives error at runtime on line "eventDataset.rdd":

Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

Below doesn't work because '.map' has a List[String] not Tuple

eventDataset.map(event => columns.map(c => event.getOrElse(c,""))
.toDF(columns:_*)

Is there a way to achieve this with programmatic schema and structured streaming datasets?

Upvotes: 3

Views: 1173

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35249

I'd use much simpler approach:

import org.apache.spark.sql.functions._

eventDataset.select(columns.map(
  c => coalesce($"map".getItem(c), lit("")).alias(c)
): _*).writeStream.format("csv").start(path)

but if you want something closer to the current solution skip RDD conversion

import org.apache.spark.sql.catalyst.encoders.RowEncoder

eventDataset.rdd.map(event =>
  Row.fromSeq(columns.map(c => event.getOrElse(c,"")))
)(RowEncoder(schema)).writeStream.format("csv").start(path)

Upvotes: 2

Related Questions