user3030878
user3030878

Reputation: 275

Filtering an RDD[T] to a subclass of type T

I am using Spark to read in a text file. Each line can belong to a different case class. Once I have converted the lines to objects described by the case classes I will convert them to a dataframe and and write to HDFS (parquet). The problem I have is that I end up with an RDD of abstract type, and I need to constrain it to specific case class type to apply the toDF function.

So far I've defined my log events as follows:

abstract class LogEvent
final case class Web(datetime: String, ... )
final case class OtherEvent(datetime: String ...)

I am reading in my text file, then mapping lines against a pattern match function to create an RDD[LogEvent]:

def convertToCase(e: List[String]): LogEvent= e match {
  case List(_, _, _, "WEB", _, _, _, _, _, _, _, _, _) =>
    Web(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10))
  case List(_, _, _, "OTHEREVENT", _, _, _, _, _, _, _, _) =>
    OtherEvent(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10))
}

At this point I wish to constrain to a given case class and convert to Spark dataframe. Something like:

val events = spark.read.textFile(...)
  .map(_.split(',').toList)
  .map(convertToCase)

I then want to reduce the RDD[LogEvent] down to an RDD of type T, which could be in the set {Web, OtherEvent}. This is what I'm struggling with. Applying a filter with a predicate to constrain to case class doesn't change the type from LogEvent, which means I cannot call 'toDF()' as this must be called on RDD[T] where T is a specific case class, not the abstract class RDD[LogEvent].

val webEvents = events.filter(someLogic).toDF()

I'm looking for a way that I can reduce the generic RDD down to an RDD of a specific case class. I'm trying to achieve this whilst maintaining type safety by not using isInstanceOf or asInstanceOf.

Is there a simple solution to this? Or am I approaching the problem in the wrong way?

Thanks in advance.

Upvotes: 2

Views: 472

Answers (1)

Cyrille Corpet
Cyrille Corpet

Reputation: 5315

You should use the collect(f: PartialFunction[T, U]): RDD[U] method (don't confuse with collect(): Array[T] which sends results as an array to the driver):

val webEvents = events.collect{
  case w: Web => w 
}.toDF()

collect is a mix between map and filter: if the input matches one of the cases given in the pattern matching, it will output the value given by the partial function. Otherwise, it will simply ignore (ie filter out) the input.

Note that you should probably also do this for your convertToCase, since the pattern matching you defined is not complete, and you might get an error at runtime if you encounter an unexpected event, or a corrupted row. The correct way to do this would be to define

val convertToCase: PartialFunction[List[String], LogEvent] = {
  case List(_, _, _, "WEB", _, _, _, _, _, _, _, _, _) =>
    Web(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10))
  case List(_, _, _, "OTHEREVENT", _, _, _, _, _, _, _, _) =>
    OtherEvent(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10))
}

And then to replace map(convertToCase) with collect(convertToCase).

Upvotes: 4

Related Questions