Lesha Pipiev
Lesha Pipiev

Reputation: 3333

OOM issue on partition shuffling

Given

The flow

  1. Reading the ORC into Dataframe
  2. Rename almost all columns
  3. Making projection the columns into column with name value which is json {"schema": [{"field1", "optional": false, "type": "string"}, ...], "payload": {"field1": "val1", "field2": "val2", ...}}
  4. Create 5 partitions
  5. Write partitioned data into ORC files on S3.

Codebase

    val dataframe = spark
          .read
          .format("orc")
          .load(inputS3Path)

    val dfCols = dataFrame.columns.map(colName => col(s"$colName"))

    val payloadStruct = struct(dfCols: _*)

    val connectSchema = prepareConnectSchema(dataFrame.schema)

    dataframe
        .withColumn("schema", from_json(lit(connectSchema), schema_of_json(connectSchema)))
        .withColumn("payload", payloadStruct)
        .withColumn("value", to_json(struct(col("schema"), col("payload"))))
        .select(col("value"))
        .repartition(5)
        .withColumn("_partition_column", concat_ws("_", lit("project-1"), lit(spark_partition_id())))
        .write
        .mode(SaveMode.Overwrite)
        .partitionBy("_partition_column")
        .orc(outputS3Path)

  def prepareSchema(structType: StructType): String = {
    val schema = buildConnectSchema(structType)
    val jsonConverter = new JsonConverter()
    jsonConverter.configure(Collections.singletonMap("schemas.enable", false), false)
    val jsonNode = jsonConverter.asJsonSchema(schema)
    val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule)
    objectMapper.writeValueAsString(jsonNode)
  }

  def buildSchema(structType: StructType): Schema = {
    val builder = new SchemaBuilder(STRUCT)
    structType.foreach(field => {
      field.dataType match {
        case subStructType: StructType => val intermediateSchema = buildSchema(subStructType)
          builder.field(field.name, intermediateSchema)
        case arrayType: ArrayType => val intermediateSchema = SchemaBuilder.array(buildValueSchemaForArrayType(arrayType))
          builder.field(field.name, intermediateSchema)
        case _ => builder.field(field.name, mapToKafkaType(field.dataType))
      }
    })
    builder.build()
  }  

The execution plan.

== Physical Plan ==
AdaptiveSparkPlan (13)
+- == Current Plan ==
   Execute InsertIntoHadoopFsRelationCommand (8)
   +- WriteFiles (7)
      +- Sort (6)
         +- Project (5)
            +- ShuffleQueryStage (4)
               +- Exchange (3)
                  +- Project (2)
                     +- Scan orc  (1)
+- == Initial Plan ==
   Execute InsertIntoHadoopFsRelationCommand (12)
   +- WriteFiles (11)
      +- Sort (10)
         +- Project (9)
            +- Exchange (3)
               +- Project (2)
                  +- Scan orc  (1)

(1) Scan orc 
Output [2419]: [field1#0, field2#1, ...]
Batched: false
Location: InMemoryFileIndex [s3://s3-path]
ReadSchema: struct<field1:string, field2:string, ...>

(2) Project
Output [1]: [to_json((ignoreNullFields,true), struct(schema, [[[field1,null,false,int32,null],[field2,null,false,string,null], ...],false,struct], payload, struct(field1, field1#0, field2, field2#1, ...)) AS value#48481]
Input [2419]: [field1#0, field2#1, ...]

(3) Exchange
Input [1]: [value#48481]
Arguments: RoundRobinPartitioning(5), REPARTITION_BY_NUM, [plan_id=72]

(4) ShuffleQueryStage
Output [1]: [value#48481]
Arguments: 0

(5) Project
Output [2]: [value#48481, concat_ws(_, project-1, cast(SPARK_PARTITION_ID() as string)) AS _partition_column#50921]
Input [1]: [value#48481]

(6) Sort
Input [2]: [value#48481, _partition_column#50921]
Arguments: [_partition_column#50921 ASC NULLS FIRST], false, 0

(7) WriteFiles
Input [2]: [value#48481, _partition_column#50921]

(8) Execute InsertIntoHadoopFsRelationCommand
Input: []
Arguments: s3://output-path/], Overwrite, [value, _partition_column]

(9) Project
Output [2]: [value#48481, concat_ws(_, project-1, cast(SPARK_PARTITION_ID() as string)) AS _partition_column#50921]
Input [1]: [value#48481]

(10) Sort
Input [2]: [value#48481, _partition_column#50921]
Arguments: [_partition_column#50921 ASC NULLS FIRST], false, 0

(11) WriteFiles
Input [2]: [value#48481, _partition_column#50921]

(12) Execute InsertIntoHadoopFsRelationCommand
Input: []
Arguments: s3://output-path/], Overwrite, [value, _partition_column]

(13) AdaptiveSparkPlan
Output: []
Arguments: isFinalPlan=false

Spark UI enter image description here enter image description here

Thoughts

  1. I'm suspecting that the OOM issue could be related to executor's Peak JVM memory Onheap and Peak Execution memory Onheap which sum (3GB + 1.2GB) is greater than 3GB allocated for JVM (-Xms3072m, -Xmx3072m).

  2. On the other hand, Spark UI summary shows that memory is not exceeded.
    enter image description here

  3. I'm not sure that I understand the numbers correctly.

The question: how to fix the OOM issue?

Upvotes: 0

Views: 27

Answers (0)

Related Questions