James Wierzba
James Wierzba

Reputation: 17548

How to write TIMESTAMP logical type (INT96) to parquet, using ParquetWriter?

I have a tool that uses a org.apache.parquet.hadoop.ParquetWriter to convert CSV data files to parquet data files.

Currently, it only handles int32, double, and string

I need to support the parquet timestamp logical type (annotated as int96), and I am lost on how to do that because I can't find a precise specification online.

It appears this timestamp encoding (int96) is rare and not well supported. I've found very little specification details online. This github README states that:

Timestamps saved as an int96 are made up of the nanoseconds in the day (first 8 byte) and the Julian day (last 4 bytes).

Specifically:

  1. Which parquet Type do I use for the column in MessageType schema? I assume I should use the primitive type, PrimitiveTypeName.INT96, but I'm not sure if there may be a way to specify a logical type?
  2. How do I write the data? i.e. In what format do I write the timestamp to the group? For an INT96 timestamp, I assume I must write some binary type?

Here is a simplified version of my code that demonstrates what I am trying to do. Specifically, take a look at the "TODO" comments, these are the two points in the code that correlate to the questions above.

List<Type> fields = new ArrayList<>();
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT32, "int32_col", null));
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.DOUBLE, "double_col", null));
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.STRING, "string_col", null));

// TODO: 
//   Specify the TIMESTAMP type. 
//   How? INT96 primitive type? Is there a logical timestamp type I can use w/ MessageType schema?
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT96, "timestamp_col", null)); 

MessageType schema = new MessageType("input", fields);

// initialize writer
Configuration configuration = new Configuration();
configuration.setQuietMode(true);
GroupWriteSupport.setSchema(schema, configuration);
ParquetWriter<Group> writer = new ParquetWriter<Group>(
  new Path("output.parquet"),
  new GroupWriteSupport(),
  CompressionCodecName.SNAPPY,
  ParquetWriter.DEFAULT_BLOCK_SIZE,
  ParquetWriter.DEFAULT_PAGE_SIZE,
  1048576,
  true,
  false,
  ParquetProperties.WriterVersion.PARQUET_1_0,
  configuration
);

// write CSV data
CSVParser parser = CSVParser.parse(new File(csv), StandardCharsets.UTF_8, CSVFormat.TDF.withQuote(null));
ArrayList<String> columns = new ArrayList<>(schemaMap.keySet());
int colIndex;
int rowNum = 0;
for (CSVRecord csvRecord : parser) {
  rowNum ++;
  Group group = f.newGroup();
  colIndex = 0;
  for (String record : csvRecord) {
    if (record == null || record.isEmpty() || record.equals( "NULL")) {
      colIndex++;
      continue;
    }


    record = record.trim();
    String type = schemaMap.get(columns.get(colIndex)).get("type").toString();
    MessageTypeConverter.addTypeValueToGroup(type, record, group, colIndex++);

    switch (colIndex) {
      case 0: // int32
        group.add(colIndex, Integer.parseInt(record));
        break;
      case 1: // double
        group.add(colIndex, Double.parseDouble(record));
        break;
      case 2: // string
        group.add(colIndex, record);
        break;
      case 3:
        // TODO: convert CSV string value to TIMESTAMP type (how?)
        throw new NotImplementedException();
    }
  }
  writer.write(group);
}
writer.close();

Upvotes: 12

Views: 31090

Answers (3)

Shubham Dhingra
Shubham Dhingra

Reputation: 304

For those using AvroParquetWriter and want to write INT96 physical type you can use

final Configuration conf = new Configuration();
conf.setStrings(WRITE_FIXED_AS_INT96,  "field_name");

and pass this configuration when building the AvroParquetWriter. Your avro schema has to be type fixed for field_name and type similar to:

"type":[
        "null",
        {
               "type":"fixed",
               "name":"INT96",
               "doc":"INT96 represented as byte[12]",
               "size":12
        }
]

Full example:

final String avroSchemaString = "{\n" +
        "   \"type\":\"record\",\n" +
        "   \"name\":\"userInfo\",\n" +
        "   \"namespace\":\"my.example\",\n" +
        "   \"fields\":[\n" +
        "      {\n" +
        "         \"name\":\"date_of_birth\",\n" +
        "         \"type\":[\n" +
        "            \"null\",\n" +
        "            {\n" +
        "               \"type\":\"fixed\",\n" +
        "               \"name\":\"INT96\",\n" +
        "               \"doc\":\"INT96 represented as byte[12]\",\n" +
        "               \"size\":12\n" +
        "            }\n" +
        "         ]\n" +
        "      }\n" +
        "   ]\n" +
        "}";
System.out.println("AvroSchema: " + avroSchemaString);

final Schema avroSchema = new Schema.Parser().parse(avroSchemaString);
System.out.println("Parsed AvroSchema: " + avroSchema);

final Path outputPath = new Path("/tmp/temp.parquet");
final Configuration conf = new Configuration();
// Comment this line and it will write as FIXED_LEN_BYTE_ARRAY of size 12
conf.setStrings(WRITE_FIXED_AS_INT96,  "date_of_birth");


final ParquetWriter<GenericData.Record> parquetWriter = AvroParquetWriter.<GenericData
                .Record>builder(outputPath)
        .withSchema(avroSchema)
        .withConf(conf)
        .withCompressionCodec(CompressionCodecName.SNAPPY)
        .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
        .build();
final GenericData.Record record = new GenericData.Record(avroSchema);

// Convert LocalDate to NanoTime or LocalDateTime to NanoTime
final LocalDate dateToday = LocalDate.now();
final NanoTime nanoTime = new NanoTime((int)JulianFields.JULIAN_DAY.getFrom(dateToday), 0L);
byte[] timestampBuffer = nanoTime.toBinary().getBytes();

// Should be 12
System.out.println(timestampBuffer.length);

GenericData.Fixed fixed = new GenericData.Fixed(avroSchema.getFields().get(0).schema(), timestampBuffer);
record.put("date_of_birth", fixed);
parquetWriter.write(record);

// Close the writer to flush records
parquetWriter.close();

It only works for version 1.12.3 of parquet-avro. GAV for that:

<dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-avro</artifactId>
            <version>1.12.3</version>
</dependency>

Upvotes: 0

James Wierzba
James Wierzba

Reputation: 17548

I figured it out, using this code from spark sql as a reference.

The INT96 binary encoding is split into 2 parts: First 8 bytes are nanoseconds since midnight Last 4 bytes is Julian day

String value = "2019-02-13 13:35:05";

final long NANOS_PER_HOUR = TimeUnit.HOURS.toNanos(1);
final long NANOS_PER_MINUTE = TimeUnit.MINUTES.toNanos(1);
final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);

// Parse date
SimpleDateFormat parser = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
cal.setTime(parser.parse(value));

// Calculate Julian days and nanoseconds in the day
LocalDate dt = LocalDate.of(cal.get(Calendar.YEAR), cal.get(Calendar.MONTH)+1, cal.get(Calendar.DAY_OF_MONTH));
int julianDays = (int) JulianFields.JULIAN_DAY.getFrom(dt);
long nanos = (cal.get(Calendar.HOUR_OF_DAY) * NANOS_PER_HOUR)
        + (cal.get(Calendar.MINUTE) * NANOS_PER_MINUTE)
        + (cal.get(Calendar.SECOND) * NANOS_PER_SECOND);

// Write INT96 timestamp
byte[] timestampBuffer = new byte[12];
ByteBuffer buf = ByteBuffer.wrap(timestampBuffer);
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(nanos).putInt(julianDays);

// This is the properly encoded INT96 timestamp
Binary tsValue = Binary.fromReusedByteArray(timestampBuffer);

Upvotes: 3

Zoltan
Zoltan

Reputation: 3105

  1. INT96 timestamps use the INT96 physical type without any logical type, so don't annotate them with anything.
  2. If you are interested in the structure of an INT96 timestamp, take a look here. If you would like to see sample code that converts to and from this format, take a look at this file from Hive.

Upvotes: 8

Related Questions