Reputation: 3267
The code below I get a value from kafka as an array of bytes. ToJobEvent converts those bytes into Option[JobEvent], then I want to filter the Nones out of the JobEvent then finally extract the JobEvent out of the Maybe monad. What is the proper way to do this in scala spark?
val jobEventDS = kafkaDS
.select($"value".as[Array[Byte]])
.map(binaryData => FromThrift.ToJobEvent(binaryData))
.filter(MaybeJobEvent => MaybeJobEvent match {
case Some(_) => true
case None => false
}).map {
case Some(jobEvent) => jobEvent
case None => null
}
The above code doesn't work. Just an example I want to get working.
Upvotes: 2
Views: 1611
Reputation: 1724
The first option is to use flatMap
df.select($"value".as[Array[Byte]])
.flatMap(binaryData => FromThrift.ToJobEvent(binaryData) match {
case Some(value) => Seq(value)
case None => Seq()
}
})
The second is to use Tuple1
as a holder
df.select($"value".as[Array[Byte]])
.map(binaryData => {
BinaryHolder(binaryData).toCaseClassMonad() match {
case Some(value) => Tuple1(value)
case None => Tuple1(null)
}
})
.filter(tuple => tuple._1 != null)
.map(tuple => tuple._1)
A bit of explanation.
If yous MaybeJobEvent
is a case class or instance of Product
, Spark won't be able to handle it.
See here.
Cannot create encoder for Option of Product type, because Product type is represented as a row, and the entire row can not be null in Spark SQL like normal databases. You can wrap your type with Tuple1 if you do want top level null Product objects, e.g. instead of creating
Dataset[Option[MyClass]]
, you can do something likeval ds: Dataset[Tuple1[MyClass]] = Seq(Tuple1(MyClass(...)), Tuple1(null)).toDS
Some examples:
case class BinaryHolder(value: Array[Byte]) {
def toStrMonad(): Option[String] = new String(value) match {
case "abc" => None
case _ => Some(new String(value))
}
def toCaseClassMonad(): Option[MyString] = new String(value) match {
case "abc" => None
case _ => Some(MyString(new String(value)))
}
}
//case classe is also Product
case class MyString(str: String)
Creating Dataset:
val ds = List(
BinaryHolder("abc".getBytes()),
BinaryHolder("dbe".getBytes()),
BinaryHolder("aws".getBytes()),
BinaryHolder("qwe".getBytes())
).toDS()
This works fine:
val df: DataFrame = ds.toDF()
df.select($"value".as[Array[Byte]])
.map(binaryData => {
BinaryHolder(binaryData).toStrMonad()
})
.show()
+-----+
|value|
+-----+
| null|
| dbe|
| aws|
| qwe|
+-----+
But this fails with the exception
df.select($"value".as[Array[Byte]])
.map(binaryData => {
//Option[MyString]
BinaryHolder(binaryData).toCaseClassMonad()
})
.show()
UnsupportedOperationException: Cannot create encoder for Option of Product type...
Returning nulls for a typed dataset also won't work
df.select($"value".as[Array[Byte]])
.map(binaryData => {
BinaryHolder(binaryData).toCaseClassMonad() match {
case Some(value) => value
case None => null
}
})
throws
java.lang.NullPointerException: Null value appeared in non-nullable field:
top level Product input object
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
Upvotes: 3