User
User

Reputation: 168

Create RDDs from main RDD

I have a RDD (RDD[(String, Iterable[Event])] which has a key that represents a month in a year and the values are the millions of events that occurred during that month.

I want to loop through each key and create an RDD of the key's events. Then I want to create an event RDD for each day of the current month's events so that I can send them to the relevant s3 location (the "directory" structure is bucketName/year/month/day).

Problem is, it seems you cannot create RDD's inside the foreach of another RDD. So I'm not sure how to achieve what I want without having to load the entire main RDD into memory (which would certainly blow out the memory of the driver and defeat the point of using Spark in the first place).

Maybe there's a way to achieve what I want using Spark, I'm just not knowledgable with it to know and was hoping someone here could help.

Here is the code I have at the moment:

 private def store(
    eventsByMonth: RDD[(String, Iterable[Event])]
  )(
    implicit sqlContext: SQLContext
  ): Try[Unit] =
    Try(
      eventsByMonth
        .foreach {
          case (_, events: Iterable[Event]) =>
            writeToS3Files(sqlContext.sparkContext.parallelize(events.toSeq))
        }
    )

  private def writeToS3Files(events: RDD[Event])(
    implicit sqlContext: SQLContext
  ): Try[Unit] =
    Try(
      // outputFilePath will contain the day that these events are related to.
      events.groupBy(_.outputFilePath).foreach {
        case (filePath: String, eventsForFile: Iterable[Event]) =>
          writeToS3File(filePath, sqlContext.sparkContext.parallelize(eventsForFile.toSeq))
      }
    )

  private def writeToS3File(filePath: String, events: RDD[Event]): Try[Unit] = {
    val fileNameWithPath = s"${filePath}${UUID.randomUUID().toString}.gz"

    Try(events.saveAsTextFile(fileNameWithPath, classOf[GzipCodec]))
  }

Upvotes: 0

Views: 122

Answers (1)

N.Hung
N.Hung

Reputation: 154

I assume that there is some way to determine the day in month that an event happens (for example, day (of type Int) is a property of an event).

You can transform RDD[(String, Iterable[Event]] into a PairRDD[(K, V)] in which the key (K) is the month and the day of month that events happen and the values (V) are all events happens in that day of month. After that, you can easily dump data into database(s).

val eventsByMonthAndDate = eventsByMonth.flatMap { case (month, events) => events.map(e => ((month, e.day), e)) }
eventsByMonthAndDate.groupby(_._1).foreach(writeToDB)

Upvotes: 1

Related Questions