PiotrW
PiotrW

Reputation: 21

Apache Spark write to multiple outputs [different parquet schemas] without caching

I want to transform my input data (XML files) and produce 3 different outputs.

Each output will be in parquet format and will have a different schema/number of columns.

Currently in my solution, the data is stored in RDD[Row], where each Row belongs to one of three types and has a different number of fields. What I'm doing now is caching the RDD, then filtering it (using the field telling me about the record type) and saving the data using the following method:

var resultDF_1 = sqlContext.createDataFrame(filtered_data_1, schema_1)
resultDF_1.write.parquet(output_path_1)
...
// the same for filtered_data_2 and filtered_data_3

Is there any way to do it better, for example do not cache entire data in memory?

In MapReduce we have MultipleOutputs class and we can do it this way:

MultipleOutputs.addNamedOutput(job, "data_type_1", DataType1OutputFormat.class, Void.class, Group.class);
MultipleOutputs.addNamedOutput(job, "data_type_2", DataType2OutputFormat.class, Void.class, Group.class);
MultipleOutputs.addNamedOutput(job, "data_type_3", DataType3OutputFormat.class, Void.class, Group.class);
...
MultipleOutputs<Void, Group> mos = new MultipleOutputs<>(context);
mos.write("data_type_1", null, myRecordGroup1, filePath1);
mos.write("data_type_2", null, myRecordGroup2, filePath2);
...

Upvotes: 2

Views: 1424

Answers (2)

samthebest
samthebest

Reputation: 31513

We had exactly this problem, to re-iterate: we read 1000s of datasets into one RDD, all of different schemas (we used a nested Map[String, Any]) and wanted to write those 1000s of datasets to different Parquet partitions in their respective schemas. All in a single embarrassingly parallel Spark Stage.

Our initial approach indeed did the hacky thing of caching, but this meant (a) 1000 passes of the cached data (b) hitting a lot of memory issues!

For a long time now I've wanted to bypass the Spark's provided .parquet methods and go to lower level underlying libraries, and wrap that in a nice functional signature. Finally recently we did exactly this!

The code is too much to copy and paste all of it here, so I will just paste the main crux of the code to explain how it works. We intend on making this code Open Source in the next year or two.

      val successFiles: List[String] = successFilePaths(tableKeyToSchema, tableKeyToOutputKey, tableKeyToOutputKeyNprs)

      // MUST happen first
      info("Deleting success files")
      successFiles.foreach(S3Utils.deleteObject(bucket, _))

      if (saveMode == SaveMode.Overwrite) {
        info("Deleting past files as in Overwrite mode")
        parDeleteDirContents(bucket, allDirectories(tableKeyToOutputKey, tableKeyToOutputKeyNprs, partitions, continuallyRunStartTime))
      } else {
        info("Not deleting past files as in Append mode")
      }

      rdd.mapPartitionsWithIndex {
        case (index, records) =>
          records.toList.groupBy(_._1).mapValues(_.map(_._2)).foreach {
            case (regularKey: RegularKey, data: List[NotProcessableRecord Either UntypedStruct]) =>
              val (nprs: List[NotProcessableRecord], successes: List[UntypedStruct]) =
                Foldable[List].partitionEither(data)(identity)

              val filename = s"part-by-partition-index-$index.snappy.parquet"

              Parquet.writeUntypedStruct(
                data = successes,
                schema = toMessageType(tableKeyToSchema(regularKey.tableKey)),
                fsMode = fs,
                path = s3 / bucket / tableKeyToOutputKey(regularKey.tableKey) / regularKey.partition.pathSuffix /?
                  continuallyRunStartTime.map(hourMinutePathSuffix) / filename
              )

              Parquet.writeNPRs(
                nprs = nprs,
                fsMode = fs,
                path = s3 / bucket / tableKeyToOutputKeyNprs(regularKey.tableKey) / regularKey.partition.pathSuffix /?
                  continuallyRunStartTime.map(hourMinutePathSuffix) / filename
              )
          } pipe Iterator.single
      }.count() // Just some action to force execution

      info("Writing _SUCCESS files")
      successFiles.foreach(S3Utils.uploadFileContent(bucket, "", _))

Of course this code cannot be copy and pasted as many methods and values are not provided. The key points are:

  1. We hand crank the deleting of _SUCCESS files and previous files when overwriting
  2. Each spark partition will result in one-or-many output files (many when multiple data schemas are in the same partition)
  3. We hand crank the writing of _SUCCESS files

Notes:

  • UntypedStruct is our nested representation of arbitrary schema. It's a little bit like Row in Spark but much better, as it's based on Map[String, Any].
  • NotProcessableRecord are essentially just dead letters

Parquet.writeUntypedStruct is the crux of the logic of writing a parquet file, so we'll explain this in more detail. Firstly

val toMessageType: StructType => MessageType = new org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter().convert

Should be self explanatory. Next fsMode contains within it the com.amazonaws.auth.AWSCredentials, then inside writeUntypedStruct we use that to construct org.apache.hadoop.conf.Configuration setting fs.s3a.access.key and fs.s3a.secret.key.

writeUntypedStruct basically just calls out to:

  def writeRaw(
    data: List[UntypedStruct],
    schema: MessageType,
    config: Configuration,
    path: Path,
    compression: CompressionCodecName = CompressionCodecName.SNAPPY
  ): Unit =
    Using.resource(
      ExampleParquetWriter.builder(path)
        .withType(schema)
        .withConf(config)
        .withCompressionCodec(compression)
        .withValidation(true)
        .build()
    )(writer => data.foreach(data => writer.write(transpose(data, new SimpleGroup(schema)))))

where SimpleGroup comes from org.apache.parquet.example.data.simple, and ExampleParquetWriter extends ParquetWriter<Group>. The method transpose is a very tedious self writing recursion through the UntypedStruct populating a Group (some ugly Java mutable low level thing).

Credit must go to https://github.com/davidainslie for figuring out how these underlying libraries work, and labouring out the code, which like I said, we intend on making Open Source soon!

Upvotes: 1

Leighton Ritchie
Leighton Ritchie

Reputation: 501

AFAIK, there is no way to split one RDD into multiple RDD per se. This is just how the way Spark's DAG works: only child RDDs pulling data from parent RDDs.

We can, however, have multiple child RDDs reading from the same parent RDD. To avoid recomputing the parent RDD, there is no other way but to cache it. I assume that you want to avoid caching because you're afraid of insufficient memory. We can avoid Out Of Memory (OOM) issue by persisting the RDD to MEMORY_AND_DISK so that large RDD will spill to disk if and when needed.

Let's begin with your original data:

val allDataRDD = sc.parallelize(Seq(Row(1,1,1),Row(2,2,2),Row(3,3,3)))

We can persist this in memory first, but allow it to spill over to disk in case of insufficient memory:

allDataRDD.persist(StorageLevel.MEMORY_AND_DISK)

We then create the 3 RDD outputs:

filtered_data_1 = allDataRDD.filter(_.get(1)==1) // //
filtered_data_2 = allDataRDD.filter(_.get(2)==1) // use your own filter funcs here
filtered_data_3 = allDataRDD.filter(_.get(3)==1) // //

We then write the outputs:

var resultDF_1 = sqlContext.createDataFrame(filtered_data_1, schema_1)
resultDF_1.write.parquet(output_path_1)
var resultDF_2 = sqlContext.createDataFrame(filtered_data_2, schema_2)
resultDF_2.write.parquet(output_path_2)
var resultDF_3 = sqlContext.createDataFrame(filtered_data_3, schema_3)
resultDF_3.write.parquet(output_path_3)

If you truly really want to avoid multiple passes, there is a workaround using a custom partitioner. You can repartition your data into 3 partitions and each partition will have its own task and hence its own output file/part. The caveat is that parallelism will be heavily reduced to 3 threads/tasks, and there's also the risk of >2GB of data stored in a single partition (Spark has a 2GB limit per partition). I am not providing detailed code for this method because I don't think it can write parquet files with different schema.

Upvotes: 0

Related Questions