Reputation: 61
I am trying to write complex data types (e.g. Array, Map) into a Parquet File Format using Apache Flink. For my use-case, I am reading data from a JSON file, doing some internal data conversions and then attempting to use a FileSink.
However, it didn't work. This is strange because Parquet Documentation states the following:
Parquet is built from the ground up with complex nested data structures in mind, and uses the record shredding and assembly algorithm described in the Dremel paper. We believe this approach is superior to simple flattening of nested name spaces.
I would expect that it was able to process nested data types properly, unless I am doing something wrong.
Here is the error message:
Caused by: java.lang.UnsupportedOperationException: Unsupported type: ARRAY<DECIMAL(12, 6)>
at org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.convertToParquetType(ParquetSchemaConverter.java:615)
at org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.convertToParquetType(ParquetSchemaConverter.java:553)
at org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.convertToParquetMessageType(ParquetSchemaConverter.java:547)
at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.<init>(ParquetRowDataBuilder.java:72)
at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.<init>(ParquetRowDataBuilder.java:70)
at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder.getWriteSupport(ParquetRowDataBuilder.java:67)
at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:563)
at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$FlinkParquetBuilder.createWriter(ParquetRowDataBuilder.java:135)
at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:56)
Here is the JSON file that I'm using, stored under src/main/resources/NESTED.json
{"discount":[670237.997082,634079.372133,303534.821218]}
Source Code:
object ReadJsonNestedData {
def main(args: Array[String]): Unit = {
// setup
val jsonResource = getClass.getResource("/NESTED.json")
val jsonFilePath = jsonResource.getPath
val tableName = "orders"
val readJSONTable =
s"""
| CREATE TABLE $tableName (
| `discount` ARRAY<DECIMAL(12, 6)>
| )WITH (
| 'connector' = 'filesystem',
| 'path' = '$jsonFilePath',
| 'format' = 'json'
|)""".stripMargin
val colFields = Array("discount")
val defaultDataTypes = Array(DataTypes.ARRAY(DataTypes.DECIMAL(12, 6)))
val rowType = RowType.of(defaultDataTypes.map(_.getLogicalType), colFields)
val defaultDataTypesAsList = defaultDataTypes.toList.asJava
val dataType = new FieldsDataType(rowType, defaultDataTypesAsList)
// Job
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val tableEnv = StreamTableEnvironment.create(env)
tableEnv.executeSql(readJSONTable)
val ordersTable = tableEnv.from(tableName)
val dataStream = tableEnv
.toDataStream(ordersTable)
.map(new ConvertRowToRowDataMapFunction(dataType))
val sink = FileSink
.forBulkFormat(
WriteParquetJobExample.outputBasePath,
ParquetRowDataBuilder.createWriterFactory(rowType, Config.hadoopConfig, true)
)
.build()
dataStream.sinkTo(sink)
env.execute()
}
}
class ConvertRowToRowDataMapFunction(fieldsDataType: FieldsDataType)
extends RichMapFunction[Row, RowData] {
private final val rowRowConverter = RowRowConverter.create(fieldsDataType)
override def open(parameters: Configuration): Unit = {
super.open(parameters)
rowRowConverter.open(this.getClass.getClassLoader)
}
override def map(row: Row): RowData =
this.rowRowConverter.toInternal(row)
}
Environment:
1.8
2.12.15
1.13.5
flink-table-api-java-bridge
flink-table-planner-blink
flink-clients
flink-json
Thank you in advance for the help!
Upvotes: 2
Views: 1245
Reputation: 61
Unfortunately MAP, ARRAY and ROW types are supported by Flink Parquet format only since Flink 1.15 (see FLINK-17782, not released yet). You may want to upgrade Flink version to 1.15 once it is released, or make your own implementation based on the latest code on master branch for now.
Upvotes: 2