Rajat Sinha
Rajat Sinha

Reputation: 1

Convert BYTES DataType to DECIMAL in Apache Pinot

I am working on ingesting realtime data from apache kafka topic. The data is stored in Avro format with Schema Registry, I want to ingest the real time data into apache pinot. As the data in Avro is in nested format I am using JSONPATH(query, 'query_field') to extract data from the nested data. All the data are coming correctly but cost, tax, cod_charges, entry_tax these fields are stored in avro topic in BYTES data type, the actual data type for these fields are integer in the original dataset and it got converted into bytes while ingesting into kafka topic as avro format. I want to convert it again into decimal or integer format while ingesting into apache pinot, there's no function for extracting bytes data type from the nested data topic.

my ingestionConfig looks like:

"ingestionConfig": {
    "transformConfigs": [
      {
        "columnName": "id",
        "transformFunction": "JSONPATHLONG(after, '$.id')"
      },
      {
        "columnName": "order_id",
        "transformFunction": "JSONPATHLONG(after, '$.order_id')"
      },
      {
        "columnName": "company_id",
        "transformFunction": "JSONPATHLONG(after, '$.company_id')"
      },
      {
        "columnName": "channel_id",
        "transformFunction": "JSONPATHLONG(after, '$.channel_id')"
      },
      {
        "columnName": "invoice_no",
        "transformFunction": "JSONPATHSTRING(after, '$.invoice_no')"
      },
      {
        "columnName": "encrypt_invoice_name",
        "transformFunction": "JSONPATHSTRING(after, '$.encrypt_invoice_name')"
      },
      {
        "columnName": "courier",
        "transformFunction": "JSONPATHSTRING(after, '$.courier')"
      },
      {
        "columnName": "sr_courier_id",
        "transformFunction": "JSONPATHLONG(after, '$.sr_courier_id')"
      },
      {
        "columnName": "code",
        "transformFunction": "JSONPATHSTRING(after, '$.code')"
      },
      {
        "columnName": "awb",
        "transformFunction": "JSONPATHSTRING(after, '$.awb')"
      },
      {
        "columnName": "pickup_token_number",
        "transformFunction": "JSONPATHSTRING(after, '$.pickup_token_number')"
      },
      {
        "columnName": "dhl_handover_id",
        "transformFunction": "JSONPATHSTRING(after, '$.dhl_handover_id')"
      },
      {
        "columnName": "dhl_handover_url",
        "transformFunction": "JSONPATHSTRING(after, '$.dhl_handover_url')"
      },
      {
        "columnName": "pickup_address_id",
        "transformFunction": "JSONPATHLONG(after, '$.pickup_address_id')"
      },
      {
        "columnName": "pickup_reshedule_count",
        "transformFunction": "JSONPATHDOUBLE(after, '$.pickup_reshedule_count')"
      },
      {
        "columnName": "return_pickup_address_id",
        "transformFunction": "JSONPATHDOUBLE(after, '$.return_pickup_address_id')"
      },
      {
        "columnName": "dhl_pickup_url",
        "transformFunction": "JSONPATHSTRING(after, '$.dhl_pickup_url')"
      },
      {
        "columnName": "method",
        "transformFunction": "JSONPATHSTRING(after, '$.method')"
      },
      {
        "columnName": "channel_shipment_id",
        "transformFunction": "JSONPATHSTRING(after, '$.channel_shipment_id')"
      },
      {
        "columnName": "weight",
        "transformFunction": "JSONPATHSTRING(after, '$.weight')"
      },
      {
        "columnName": "dimensions",
        "transformFunction": "JSONPATHSTRING(after, '$.dimensions')"
      },
      {
        "columnName": "volumetric_weight",
        "transformFunction": "JSONPATHSTRING(after, '$.volumetric_weight')"
      },
      {
        "columnName": "quantity",
        "transformFunction": "JSONPATHDOUBLE(after, '$.quantity')"
      },
      {
        "columnName": "status",
        "transformFunction": "JSONPATHDOUBLE(after, '$.status')"
      },
      {
        "columnName": "state_type",
        "transformFunction": "JSONPATHDOUBLE(after, '$.state_type')"
      },
      {
        "columnName": "sub_status",
        "transformFunction": "JSONPATHDOUBLE(after, '$.sub_status')"
      },
      {
        "columnName": "status_code",
        "transformFunction": "JSONPATHSTRING(after, '$.status_code')"
      },
      {
        "columnName": "shipment_zone",
        "transformFunction": "JSONPATHDOUBLE(after, '$.shipment_zone')"
      },
      {
        "columnName": "label_url",
        "transformFunction": "JSONPATHSTRING(after, '$.label_url')"
      },
      {
        "columnName": "manifest_url",
        "transformFunction": "JSONPATHSTRING(after, '$.manifest_url')"
      },
      {
        "columnName": "is_locked",
        "transformFunction": "JSONPATHSTRING(after, '$.is_locked')"
      },
      {
        "columnName": "customer_gstin",
        "transformFunction": "JSONPATHSTRING(after, '$.customer_gstin')"
      },
      {
        "columnName": "eway_bill_number",
        "transformFunction": "JSONPATHSTRING(after, '$.eway_bill_number')"
      },
      {
        "columnName": "pod",
        "transformFunction": "JSONPATHSTRING(after, '$.pod')"
      },
      {
        "columnName": "frozen_weight",
        "transformFunction": "JSONPATHDOUBLE(after, '$.frozen_weight')"
      },
      {
        "columnName": "isd_code",
        "transformFunction": "JSONPATHSTRING(after, '$.isd_code')"
      },
      {
        "columnName": "seller_address",
        "transformFunction": "JSONPATHSTRING(after, '$.seller_address')"
      },
      {
        "columnName": "shipping_address",
        "transformFunction": "JSONPATHSTRING(after, '$.shipping_address')"
      },
      {
        "columnName": "customer_details",
        "transformFunction": "JSONPATHSTRING(after, '$.customer_details')"
      },
      {
        "columnName": "comment",
        "transformFunction": "JSONPATHSTRING(after, '$.comment')"
      },
      {
        "columnName": "others",
        "transformFunction": "JSONPATHSTRING(after, '$.others')"
      },
      {
        "columnName": "entry_tax",
        "transformFunction": "JSONPATHDOUBLE(after, '$.entry_tax')"
      },
      {
        "columnName": "cost",
        "transformFunction": "JSONPATHDOUBLE(after, '$.cost')"
      },
      {
        "columnName": "tax",
        "transformFunction": "JSONPATHDOUBLE(after, '$.tax')"
      },
      {
        "columnName": "cod_charges",
        "transformFunction": "JSONPATHDOUBLE(after, '$.cod_charges')"
      },
      {
        "columnName": "total",
        "transformFunction": "JSONPATHDOUBLE(after, '$.total')"
      },
      {
        "columnName": "invoice_date",
        "transformFunction": "JSONPATHLONG(after, '$.invoice_date')"
      },
      {
        "columnName": "awb_assign_date",
        "transformFunction": "JSONPATHLONG(after, '$.awb_assign_date')"
      },
      {
        "columnName": "pickup_generated_date",
        "transformFunction": "JSONPATHLONG(after, '$.pickup_generated_date')"
      },
      {
        "columnName": "pickup_scheduled_date",
        "transformFunction": "JSONPATHLONG(after, '$.pickup_scheduled_date')"
      },
      {
        "columnName": "out_for_pickup_date",
        "transformFunction": "JSONPATHSTRING(after, '$.out_for_pickup_date')"
      },
      {
        "columnName": "created_at",
        "transformFunction": "JSONPATHSTRING(after, '$.created_at')"
      },
      {
        "columnName": "updated_at",
        "transformFunction": "JSONPATHSTRING(after, '$.updated_at')"
      },
      {
        "columnName": "rto_initiated_date",
        "transformFunction": "JSONPATHSTRING(after, '$.rto_initiated_date')"
      },
      {
        "columnName": "rto_delivered_date",
        "transformFunction": "JSONPATHSTRING(after, '$.rto_delivered_date')"
      },
      {
        "columnName": "updated_on",
        "transformFunction": "JSONPATHSTRING(after, '$.updated_on')"
      },
      {
        "columnName": "etd",
        "transformFunction": "JSONPATHLONG(after, '$.etd')"
      },
      {
        "columnName": "promised_pickup_tat",
        "transformFunction": "JSONPATHDOUBLE(after, '$.promised_pickup_tat')"
      },
      {
        "columnName": "promised_delivery_tat",
        "transformFunction": "JSONPATHDOUBLE(after, '$.promised_delivery_tat')"
      },
      {
        "columnName": "promised_rto_tat",
        "transformFunction": "JSONPATHDOUBLE(after, '$.promised_rto_tat')"
      },
      {
        "columnName": "promised_cod_remittance_tat",
        "transformFunction": "JSONPATHDOUBLE(after, '$.promised_cod_remittance_tat')"
      },
      {
        "columnName": "shipped_date",
        "transformFunction": "JSONPATHSTRING(after, '$.shipped_date')"
      },
      {
        "columnName": "delivered_date",
        "transformFunction": "JSONPATHSTRING(after, '$.delivered_date')"
      },
      {
        "columnName": "returned_date",
        "transformFunction": "JSONPATHSTRING(after, '$.returned_date')"
      },
      {
        "columnName": "eway_bill_date",
        "transformFunction": "JSONPATHSTRING(after, '$.returned_date')"
      },
      {
        "columnName": "mps_data",
        "transformFunction": "JSONPATHSTRING(after, '$.returned_date')"
      }
    ],

and my schemaFile.json:

{
  "schemaName": "shipments",
  "enableColumnBasedNullHandling": true,
  "dimensionFieldSpecs": [
    { "name": "id", "dataType": "LONG" },
    { "name": "order_id", "dataType": "LONG" },
    { "name": "company_id", "dataType": "LONG" },
    { "name": "channel_id", "dataType": "LONG" },
    { "name": "invoice_no", "dataType": "STRING" },
    { "name": "encrypt_invoice_name", "dataType": "STRING" },
    { "name": "courier", "dataType": "STRING" },
    { "name": "sr_courier_id", "dataType": "INT" },
    { "name": "code", "dataType": "STRING" },
    { "name": "awb", "dataType": "STRING" },
    { "name": "pickup_token_number", "dataType": "STRING" },
    { "name": "pickup_address_id", "dataType": "LONG" },
    { "name": "return_pickup_address_id", "dataType": "INT" },
    { "name": "dhl_handover_id", "dataType": "STRING" },
    { "name": "dhl_handover_url", "dataType": "STRING" },
    { "name": "pickup_reshedule_count", "dataType": "INT" },
    { "name": "dhl_pickup_url", "dataType": "STRING" },
    { "name": "method", "dataType": "STRING" },
    { "name": "channel_shipment_id", "dataType": "STRING" },
    { "name": "weight", "dataType": "STRING" },
    { "name": "dimensions", "dataType": "STRING" },
    { "name": "volumetric_weight", "dataType": "STRING" },
    { "name": "quantity", "dataType": "INT" },
    { "name": "status", "dataType": "INT" },
    { "name": "state_type", "dataType": "INT" },
    { "name": "sub_status", "dataType": "INT" },
    { "name": "status_code", "dataType": "STRING" },
    { "name": "shipment_zone", "dataType": "INT" },
    { "name": "label_url", "dataType": "STRING" },
    { "name": "manifest_url", "dataType": "STRING" },
    { "name": "is_locked", "dataType": "INT" },
    { "name": "customer_gstin", "dataType": "STRING" },
    { "name": "eway_bill_number", "dataType": "STRING" },
    { "name": "pod", "dataType": "STRING" },
    { "name": "frozen_weight", "dataType": "INT" },
    { "name": "isd_code", "dataType": "STRING" },
    { "name": "seller_address", "dataType": "STRING" },
    { "name": "shipping_address", "dataType": "STRING" },
    { "name": "customer_details", "dataType": "STRING" },
    { "name": "comment", "dataType": "STRING" },
    { "name": "others", "dataType": "STRING" },
    { "name": "mps_data", "dataType": "STRING" }
  ],
  "metricFieldSpecs": [
    { "name": "entry_tax", "dataType": "DOUBLE", "defaultNullValue": null },
    { "name": "cost", "dataType": "DOUBLE", "defaultNullValue": null },
    { "name": "tax", "dataType": "DOUBLE", "defaultNullValue": null },
    { "name": "cod_charges", "dataType": "DOUBLE", "defaultNullValue": null },
    { "name": "total", "dataType": "DOUBLE", "defaultNullValue": null }
    ],
  "dateTimeFieldSpecs": [
    { "name": "invoice_date", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" },
    { "name": "awb_assign_date", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" },
    { "name": "pickup_generated_date", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" },
    { "name": "pickup_scheduled_date", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" },
    { "name": "out_for_pickup_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
    { "name": "created_at", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
    { "name": "updated_at", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
    { "name": "rto_initiated_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
    { "name": "rto_delivered_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
    { "name": "updated_on", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
    { "name": "etd", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" },
    { "name": "promised_pickup_tat", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
    { "name": "promised_delivery_tat", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
    { "name": "promised_rto_tat", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
    { "name": "promised_cod_remittance_tat", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
    { "name": "shipped_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
    { "name": "delivered_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
    { "name": "returned_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
    { "name": "eway_bill_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }
    ]
}

Please let me know if you need anything to debug this issue.

I have tried making it into Double type data type and then using JSONPATHDOUBLE but the data that is coming to pinot is all null.

My table on apache pinot

Upvotes: 0

Views: 14

Answers (0)

Related Questions