Reputation: 43
I'm trying to create a Spark Streaming that consumes Kafka messages encoded in ProtoBuf.
Here is what I tried for the last few days
import spark.implicits._
def parseLine (str: Array[Byte]): ProtoSchema = ProtoSchema.parseFrom(str)
val storageLoc: String = "/tmp/avl/output"
val checkpointLoc: String = "/tmp/avl/checkpoint"
val dfStreamReader: DataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("failOnDataLoss", value = false)
.option("subscribe", topics)
.load()
val dfStreamReaderValues: Dataset[Array[Byte]] = dfStreamReader.map(row => row.getAs[Array[Byte]]("value"))
val rddProtoSchema: Dataset[ProtoSchema] = dfStreamReaderValues.map(str => parseLine(str))
val dfRaw: DataFrame = spark.sqlContext.protoToDataFrame(rddProtoSchema.rdd)
val streamWriterAirline: StreamingQuery = dfRaw.writeStream
.format("parquet")
.option("path", storageLoc)
.option("checkpointLocation", checkpointLoc)
.outputMode(Append)
.trigger(ProcessingTime("2 seconds"))
.start()
spark.streams.awaitAnyTermination(20000)
With scalapb, I manage to make decode a binary proto file and convert to a dataframe. But with streaming, I get this exception at compile time in parsing line:
val rddProtoSchema: Dataset[ProtoSchema] = dfStreamReaderValues.map(str => parseLine(str))
>>>>>
scala.ScalaReflectionException: <none> is not a term
Can anyone give some hint?
Upvotes: 3
Views: 4436
Reputation: 6582
UPDATE: sparksql-scalapb is now able to derive encoders for protocol buffers, and the previous approach with the UDT generators is no longer needed. Instructions are available here.
Old answer (irrelevant now): When using datasets, Spark tries to find a SQL type for each of the fields in your message. Spark does not know how to deal with ScalaPB enums (they are represented as sealed trait which is extended by case objects), and therefore it gives this error. The way around is to register the enums and a user-defined type. This can be done as follows:
project/plugins.sbt
(not to your main build.sbt
):libraryDependencies += "com.thesamet.scalapb" %% "sparksql-scalapb-gen" % "0.8.1"
Check that the version above matches the version of
sparksql-scalapb
you are using.
PB.targets
in build.sbt
:PB.targets in Compile := Seq(
scalapb.gen() -> (sourceManaged in Compile).value,
scalapb.UdtGenerator -> (sourceManaged in Compile).value
)
Regenerating sources (may require sbt clean
followed by sbt compile
)
Call the generated registration function in your main function. It would be mypackage.MyFileUdt.register()
See: https://scalapb.github.io/sparksql.html#datasets-and-none-is-not-a-term
Example project: https://github.com/thesamet/sparksql-scalapb-test
Upvotes: 1