Zed
Zed

Reputation: 61

Apache Flink: Cannot write out complex data type for Parquet

I am trying to write complex data types (e.g. Array, Map) into a Parquet File Format using Apache Flink. For my use-case, I am reading data from a JSON file, doing some internal data conversions and then attempting to use a FileSink.

However, it didn't work. This is strange because Parquet Documentation states the following:

Parquet is built from the ground up with complex nested data structures in mind, and uses the record shredding and assembly algorithm described in the Dremel paper. We believe this approach is superior to simple flattening of nested name spaces.

I would expect that it was able to process nested data types properly, unless I am doing something wrong.

Here is the error message:

Caused by: java.lang.UnsupportedOperationException: Unsupported type: ARRAY<DECIMAL(12, 6)>
    at org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.convertToParquetType(ParquetSchemaConverter.java:615)
    at org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.convertToParquetType(ParquetSchemaConverter.java:553)
    at org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.convertToParquetMessageType(ParquetSchemaConverter.java:547)
    at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.<init>(ParquetRowDataBuilder.java:72)
    at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.<init>(ParquetRowDataBuilder.java:70)
    at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder.getWriteSupport(ParquetRowDataBuilder.java:67)
    at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:563)
    at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$FlinkParquetBuilder.createWriter(ParquetRowDataBuilder.java:135)
    at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:56)

Here is the JSON file that I'm using, stored under src/main/resources/NESTED.json

{"discount":[670237.997082,634079.372133,303534.821218]}

Source Code:

object ReadJsonNestedData {
  def main(args: Array[String]): Unit = {
    // setup
    val jsonResource = getClass.getResource("/NESTED.json")
    val jsonFilePath = jsonResource.getPath
    val tableName = "orders"
    val readJSONTable =
      s"""
         | CREATE TABLE $tableName (
         |   `discount` ARRAY<DECIMAL(12, 6)>
         | )WITH (
         |    'connector' = 'filesystem',
         |    'path' = '$jsonFilePath',
         |    'format' = 'json'
         |)""".stripMargin

    val colFields = Array("discount")
    val defaultDataTypes = Array(DataTypes.ARRAY(DataTypes.DECIMAL(12, 6)))
    val rowType = RowType.of(defaultDataTypes.map(_.getLogicalType), colFields)
    val defaultDataTypesAsList = defaultDataTypes.toList.asJava
    val dataType = new FieldsDataType(rowType, defaultDataTypesAsList)

    // Job
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    val tableEnv = StreamTableEnvironment.create(env)
    tableEnv.executeSql(readJSONTable)
    val ordersTable = tableEnv.from(tableName)
    val dataStream = tableEnv
      .toDataStream(ordersTable)
      .map(new ConvertRowToRowDataMapFunction(dataType))
    val sink = FileSink
      .forBulkFormat(
        WriteParquetJobExample.outputBasePath,
        ParquetRowDataBuilder.createWriterFactory(rowType, Config.hadoopConfig, true)
      )
      .build()
    dataStream.sinkTo(sink)
    env.execute()
  }
}

class ConvertRowToRowDataMapFunction(fieldsDataType: FieldsDataType)
    extends RichMapFunction[Row, RowData] {
  private final val rowRowConverter = RowRowConverter.create(fieldsDataType)

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    rowRowConverter.open(this.getClass.getClassLoader)
  }

  override def map(row: Row): RowData =
    this.rowRowConverter.toInternal(row)
}

Environment:

Thank you in advance for the help!

Upvotes: 2

Views: 1245

Answers (1)

renqs
renqs

Reputation: 61

Unfortunately MAP, ARRAY and ROW types are supported by Flink Parquet format only since Flink 1.15 (see FLINK-17782, not released yet). You may want to upgrade Flink version to 1.15 once it is released, or make your own implementation based on the latest code on master branch for now.

Upvotes: 2

Related Questions