Brian Yeh
Brian Yeh

Reputation: 3267

How to filter by Optional types in scala spark dataSets

The code below I get a value from kafka as an array of bytes. ToJobEvent converts those bytes into Option[JobEvent], then I want to filter the Nones out of the JobEvent then finally extract the JobEvent out of the Maybe monad. What is the proper way to do this in scala spark?

val jobEventDS = kafkaDS
      .select($"value".as[Array[Byte]])
      .map(binaryData => FromThrift.ToJobEvent(binaryData))
      .filter(MaybeJobEvent => MaybeJobEvent match {
        case Some(_) => true
        case None => false
      }).map {
      case Some(jobEvent) => jobEvent
      case None => null
    }

The above code doesn't work. Just an example I want to get working.

Upvotes: 2

Views: 1611

Answers (1)

Gelerion
Gelerion

Reputation: 1724

The first option is to use flatMap

df.select($"value".as[Array[Byte]])
      .flatMap(binaryData => FromThrift.ToJobEvent(binaryData) match {
          case Some(value) => Seq(value)
          case None => Seq()
        }
      })

The second is to use Tuple1 as a holder

df.select($"value".as[Array[Byte]])
      .map(binaryData => {
        BinaryHolder(binaryData).toCaseClassMonad() match {
          case Some(value) => Tuple1(value)
          case None => Tuple1(null)
        }
      })
      .filter(tuple => tuple._1 != null)
      .map(tuple => tuple._1)

A bit of explanation.

If yous MaybeJobEvent is a case class or instance of Product, Spark won't be able to handle it.
See here.

Cannot create encoder for Option of Product type, because Product type is represented as a row, and the entire row can not be null in Spark SQL like normal databases. You can wrap your type with Tuple1 if you do want top level null Product objects, e.g. instead of creating Dataset[Option[MyClass]], you can do something like val ds: Dataset[Tuple1[MyClass]] = Seq(Tuple1(MyClass(...)), Tuple1(null)).toDS

Some examples:

case class BinaryHolder(value: Array[Byte]) {
  def toStrMonad(): Option[String] = new String(value) match {
    case "abc" => None
    case _ => Some(new String(value))
  }

  def toCaseClassMonad(): Option[MyString] = new String(value) match {
    case "abc" => None
    case _ => Some(MyString(new String(value)))
  }
}

//case classe is also Product 
case class MyString(str: String)

Creating Dataset:

val ds = List(
      BinaryHolder("abc".getBytes()),
      BinaryHolder("dbe".getBytes()),
      BinaryHolder("aws".getBytes()),
      BinaryHolder("qwe".getBytes())
    ).toDS()

This works fine:

val df: DataFrame = ds.toDF()
    df.select($"value".as[Array[Byte]])
      .map(binaryData => {
        BinaryHolder(binaryData).toStrMonad()
      })
      .show()

+-----+
|value|
+-----+
| null|
|  dbe|
|  aws|
|  qwe|
+-----+

But this fails with the exception

df.select($"value".as[Array[Byte]])
      .map(binaryData => {
        //Option[MyString]
        BinaryHolder(binaryData).toCaseClassMonad()
      })
      .show()

UnsupportedOperationException: Cannot create encoder for Option of Product type...

Returning nulls for a typed dataset also won't work

 df.select($"value".as[Array[Byte]])
      .map(binaryData => {
        BinaryHolder(binaryData).toCaseClassMonad() match {
          case Some(value) => value
          case None => null
        }
      })

throws

java.lang.NullPointerException: Null value appeared in non-nullable field:
top level Product input object
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).

Upvotes: 3

Related Questions