Akshata
Akshata

Reputation: 1025

Apache calcite: cast integer to datetime

I am using Beam SQL and trying to cast integer to datetime field.

  Schema resultSchema =
    Schema.builder()
          .addInt64Field("detectedCount")
          .addStringField("sensor")
          .addInt64Field("timestamp")
          .build();

  PCollection<Row> sensorRawUnboundedTimestampedSubset = 
    sensorRowUnbounded.apply(
        SqlTransform.query(
          "select PCOLLECTION.payload.`value`.`count` detectedCount, \n"
          + "PCOLLECTION.payload.`value`.`id` sensor, \n"
          + "PCOLLECTION.`timestamp` `timestamp` \n"
          + "from PCOLLECTION "))
    .setRowSchema(resultSchema);

For some computation and windowing, I want to convert/cast timestamp to Datetime field? Please provide some pointers to convert timestamp in resultSchema to DateTime. datatype.

Upvotes: 0

Views: 1971

Answers (1)

Anton
Anton

Reputation: 2539

There is no out of the box way to do that in Beam (or in Calcite). Short version - Calcite or Beam have no way of knowing how you actually store the dates or timestamps in the integers. However, assuming you have epoch millis, this should work:

@Test
public void testBlah() throws Exception {
  // input schema, has timestamps as epoch millis
  Schema schema = Schema.builder().addInt64Field("ts").addStringField("st").build();

  DateTime ts1 = new DateTime(2019, 8, 9, 10, 11, 12);
  DateTime ts2 = new DateTime(2019, 8, 9, 10, 11, 12);

  PCollection<Row> input =
    pipeline
      .apply(
          "createRows",
          Create.of(
              Row.withSchema(schema).addValues(ts1.getMillis(), "two").build(),
              Row.withSchema(schema).addValues(ts2.getMillis(), "twelve").build()))
      .setRowSchema(schema);

  PCollection<Row> result =
    input.apply(
      SqlTransform.query(
          "SELECT \n"
          + "(TIMESTAMP '1970-01-01 00:00:00' + ts * INTERVAL '0.001' SECOND) as ts, \n"
          + "st \n"
          + "FROM \n"
          + "PCOLLECTION"));

  // output schema, has timestamps as DateTime
  Schema outSchema = Schema.builder().addDateTimeField("ts").addStringField("st").build();
  PAssert.that(result)
    .containsInAnyOrder(
        Row.withSchema(outSchema).addValues(ts1, "two").build(),
        Row.withSchema(outSchema).addValues(ts2, "twelve").build());
  pipeline.run();
}

Alternatively you can always do it in java, not in SQL, just apply a custom ParDo to the output of the SqlTransform. In that ParDo extract the integer timestamp from the Row object, convert it to DateTime and then emit it, e.g. as part of another row with a different schema.

Upvotes: 3

Related Questions