Reputation: 3333
The flow
{"schema": [{"field1", "optional": false, "type": "string"}, ...], "payload": {"field1": "val1", "field2": "val2", ...}}
val dataframe = spark
val dfCols = => col(s"$colName"))
val payloadStruct = struct(dfCols: _*)
val connectSchema = prepareConnectSchema(dataFrame.schema)
.withColumn("schema", from_json(lit(connectSchema), schema_of_json(connectSchema)))
.withColumn("payload", payloadStruct)
.withColumn("value", to_json(struct(col("schema"), col("payload"))))
.withColumn("_partition_column", concat_ws("_", lit("project-1"), lit(spark_partition_id())))
def prepareSchema(structType: StructType): String = {
val schema = buildConnectSchema(structType)
val jsonConverter = new JsonConverter()
jsonConverter.configure(Collections.singletonMap("schemas.enable", false), false)
val jsonNode = jsonConverter.asJsonSchema(schema)
val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule)
def buildSchema(structType: StructType): Schema = {
val builder = new SchemaBuilder(STRUCT)
structType.foreach(field => {
field.dataType match {
case subStructType: StructType => val intermediateSchema = buildSchema(subStructType)
builder.field(, intermediateSchema)
case arrayType: ArrayType => val intermediateSchema = SchemaBuilder.array(buildValueSchemaForArrayType(arrayType))
builder.field(, intermediateSchema)
case _ => builder.field(, mapToKafkaType(field.dataType))
The execution plan.
== Physical Plan ==
AdaptiveSparkPlan (13)
+- == Current Plan ==
Execute InsertIntoHadoopFsRelationCommand (8)
+- WriteFiles (7)
+- Sort (6)
+- Project (5)
+- ShuffleQueryStage (4)
+- Exchange (3)
+- Project (2)
+- Scan orc (1)
+- == Initial Plan ==
Execute InsertIntoHadoopFsRelationCommand (12)
+- WriteFiles (11)
+- Sort (10)
+- Project (9)
+- Exchange (3)
+- Project (2)
+- Scan orc (1)
(1) Scan orc
Output [2419]: [field1#0, field2#1, ...]
Batched: false
Location: InMemoryFileIndex [s3://s3-path]
ReadSchema: struct<field1:string, field2:string, ...>
(2) Project
Output [1]: [to_json((ignoreNullFields,true), struct(schema, [[[field1,null,false,int32,null],[field2,null,false,string,null], ...],false,struct], payload, struct(field1, field1#0, field2, field2#1, ...)) AS value#48481]
Input [2419]: [field1#0, field2#1, ...]
(3) Exchange
Input [1]: [value#48481]
Arguments: RoundRobinPartitioning(5), REPARTITION_BY_NUM, [plan_id=72]
(4) ShuffleQueryStage
Output [1]: [value#48481]
Arguments: 0
(5) Project
Output [2]: [value#48481, concat_ws(_, project-1, cast(SPARK_PARTITION_ID() as string)) AS _partition_column#50921]
Input [1]: [value#48481]
(6) Sort
Input [2]: [value#48481, _partition_column#50921]
Arguments: [_partition_column#50921 ASC NULLS FIRST], false, 0
(7) WriteFiles
Input [2]: [value#48481, _partition_column#50921]
(8) Execute InsertIntoHadoopFsRelationCommand
Input: []
Arguments: s3://output-path/], Overwrite, [value, _partition_column]
(9) Project
Output [2]: [value#48481, concat_ws(_, project-1, cast(SPARK_PARTITION_ID() as string)) AS _partition_column#50921]
Input [1]: [value#48481]
(10) Sort
Input [2]: [value#48481, _partition_column#50921]
Arguments: [_partition_column#50921 ASC NULLS FIRST], false, 0
(11) WriteFiles
Input [2]: [value#48481, _partition_column#50921]
(12) Execute InsertIntoHadoopFsRelationCommand
Input: []
Arguments: s3://output-path/], Overwrite, [value, _partition_column]
(13) AdaptiveSparkPlan
Output: []
Arguments: isFinalPlan=false
I'm suspecting that the OOM issue could be related to executor's Peak JVM memory Onheap
and Peak Execution memory Onheap
which sum (3GB + 1.2GB) is greater than 3GB allocated for JVM (-Xms3072m, -Xmx3072m).
On the other hand, Spark UI summary shows that memory is not exceeded.
I'm not sure that I understand the numbers correctly.
The question: how to fix the OOM issue?
Upvotes: 0
Views: 27