Cole356
Cole356

Reputation: 11

Groovy function to use conditional operator and datetime parsing

I want to write a groovy function in the "transformConfigs" of table config, in Apache Pinot. I want to assign value 'A' if my timestamp is greater than 7:00:00, but otherwise assign value 'B' in the column.

These are what I have tried, but none of these work:

Groovy({LocalDateTime.parse(t, \"yyyy-MM-dd hh:mm:ss.SSS\").toDate().format('HH:mm:ss') > LocalTime.of(7, 0, 0) ? 'A' : 'B'}, t)
 Groovy({java.time.format.DateTimeFormatter.LocalDateTime.parse(t, "yyyy-MM-dd hh:mm:ss.SSS").toDate().format('HH:mm:ss') > java.time.format.DateTimeFormatter.LocalTime.of(7, 0, 0) ? 'A' : 'B'}, t)
Groovy({SimpleDateFormat time_now = new SimpleDateFormat(\"yyyy-MM-dd hh:mm:ss.SSS\"); SimpleDateFormat output = new SimpleDateFormat(\"hh:mm:ss.SSS\"); Date d = time_now.parse(t); String f_time_now = output.format(d); String f_shift_time = output.format(\"07:00:00.000\"); def result = formattedTime > f_shift_time ? 'A' : 'B'; return result}, t)

Table config:


    {
      "REALTIME": {
        "tableName": "test_REALTIME",
        "tableType": "REALTIME",
        "segmentsConfig": {
          "schemaName": "test",
          "replication": "1",
          "timeColumnName": "t",
          "allowNullTimeValue": false,
          "replicasPerPartition": "1"
        },
        "tenants": {
          "broker": "DefaultTenant",
          "server": "DefaultTenant",
          "tagOverrideConfig": {}
        },
        "tableIndexConfig": {
          "invertedIndexColumns": [],
          "noDictionaryColumns": [],
          "streamConfigs": {
            "streamType": "kafka",
            "stream.kafka.topic.name": "test_topic",
            "stream.kafka.broker.list": "localhost:9092",
            "stream.kafka.consumer.type": "lowlevel",
            "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
            ...
          },
          "rangeIndexColumns": [],
          "rangeIndexVersion": 2,
          "autoGeneratedInvertedIndex": false,
          "createInvertedIndexDuringSegmentGeneration": false,
          "sortedColumn": [],
          "bloomFilterColumns": [],
          "loadMode": "MMAP",
          "onHeapDictionaryColumns": [],
          "varLengthDictionaryColumns": [],
          "enableDefaultStarTree": false,
          "enableDynamicStarTreeCreation": false,
          "aggregateMetrics": false,
          "nullHandlingEnabled": false
        },
        "metadata": {},
        "quota": {},
        "routing": {},
        "query": {},
        "ingestionConfig": {
          "transformConfigs": [
            {
              "columnName": "shift",
              "transformFunction": "Groovy({LocalDateTime.parse(t, \"yyyy-MM-dd hh:mm:ss.SSS\").toDate().format('HH:mm:ss') > LocalTime.of(7, 0, 0) ? 'A' : 'B'}, t)"
            }
          ]
        },
        "isDimTable": false
      }
    }

Schema:

    {
      "schemaName": "test",
      "dimensionFieldSpecs": [
        {
          "name": "shift",
          "dataType": "STRING"
        }
      ],
      "dateTimeFieldSpecs": [
        {
          "name": "t",
          "dataType": "TIMESTAMP",
          "format": "1:MILLISECONDS:EPOCH",
          "granularity": "1:MILLISECONDS"
        }
      ]
    }

Output: no ingestion takes place.

Thank you in advance. Let me know if I can provide more details.

Upvotes: 1

Views: 250

Answers (0)

Related Questions