Reputation: 1
I am working on ingesting realtime data from apache kafka topic. The data is stored in Avro format with Schema Registry, I want to ingest the real time data into apache pinot. As the data in Avro is in nested format I am using JSONPATH(query, 'query_field') to extract data from the nested data. All the data are coming correctly but cost, tax, cod_charges, entry_tax these fields are stored in avro topic in BYTES data type, the actual data type for these fields are integer in the original dataset and it got converted into bytes while ingesting into kafka topic as avro format. I want to convert it again into decimal or integer format while ingesting into apache pinot, there's no function for extracting bytes data type from the nested data topic.
my ingestionConfig looks like:
"ingestionConfig": {
"transformConfigs": [
{
"columnName": "id",
"transformFunction": "JSONPATHLONG(after, '$.id')"
},
{
"columnName": "order_id",
"transformFunction": "JSONPATHLONG(after, '$.order_id')"
},
{
"columnName": "company_id",
"transformFunction": "JSONPATHLONG(after, '$.company_id')"
},
{
"columnName": "channel_id",
"transformFunction": "JSONPATHLONG(after, '$.channel_id')"
},
{
"columnName": "invoice_no",
"transformFunction": "JSONPATHSTRING(after, '$.invoice_no')"
},
{
"columnName": "encrypt_invoice_name",
"transformFunction": "JSONPATHSTRING(after, '$.encrypt_invoice_name')"
},
{
"columnName": "courier",
"transformFunction": "JSONPATHSTRING(after, '$.courier')"
},
{
"columnName": "sr_courier_id",
"transformFunction": "JSONPATHLONG(after, '$.sr_courier_id')"
},
{
"columnName": "code",
"transformFunction": "JSONPATHSTRING(after, '$.code')"
},
{
"columnName": "awb",
"transformFunction": "JSONPATHSTRING(after, '$.awb')"
},
{
"columnName": "pickup_token_number",
"transformFunction": "JSONPATHSTRING(after, '$.pickup_token_number')"
},
{
"columnName": "dhl_handover_id",
"transformFunction": "JSONPATHSTRING(after, '$.dhl_handover_id')"
},
{
"columnName": "dhl_handover_url",
"transformFunction": "JSONPATHSTRING(after, '$.dhl_handover_url')"
},
{
"columnName": "pickup_address_id",
"transformFunction": "JSONPATHLONG(after, '$.pickup_address_id')"
},
{
"columnName": "pickup_reshedule_count",
"transformFunction": "JSONPATHDOUBLE(after, '$.pickup_reshedule_count')"
},
{
"columnName": "return_pickup_address_id",
"transformFunction": "JSONPATHDOUBLE(after, '$.return_pickup_address_id')"
},
{
"columnName": "dhl_pickup_url",
"transformFunction": "JSONPATHSTRING(after, '$.dhl_pickup_url')"
},
{
"columnName": "method",
"transformFunction": "JSONPATHSTRING(after, '$.method')"
},
{
"columnName": "channel_shipment_id",
"transformFunction": "JSONPATHSTRING(after, '$.channel_shipment_id')"
},
{
"columnName": "weight",
"transformFunction": "JSONPATHSTRING(after, '$.weight')"
},
{
"columnName": "dimensions",
"transformFunction": "JSONPATHSTRING(after, '$.dimensions')"
},
{
"columnName": "volumetric_weight",
"transformFunction": "JSONPATHSTRING(after, '$.volumetric_weight')"
},
{
"columnName": "quantity",
"transformFunction": "JSONPATHDOUBLE(after, '$.quantity')"
},
{
"columnName": "status",
"transformFunction": "JSONPATHDOUBLE(after, '$.status')"
},
{
"columnName": "state_type",
"transformFunction": "JSONPATHDOUBLE(after, '$.state_type')"
},
{
"columnName": "sub_status",
"transformFunction": "JSONPATHDOUBLE(after, '$.sub_status')"
},
{
"columnName": "status_code",
"transformFunction": "JSONPATHSTRING(after, '$.status_code')"
},
{
"columnName": "shipment_zone",
"transformFunction": "JSONPATHDOUBLE(after, '$.shipment_zone')"
},
{
"columnName": "label_url",
"transformFunction": "JSONPATHSTRING(after, '$.label_url')"
},
{
"columnName": "manifest_url",
"transformFunction": "JSONPATHSTRING(after, '$.manifest_url')"
},
{
"columnName": "is_locked",
"transformFunction": "JSONPATHSTRING(after, '$.is_locked')"
},
{
"columnName": "customer_gstin",
"transformFunction": "JSONPATHSTRING(after, '$.customer_gstin')"
},
{
"columnName": "eway_bill_number",
"transformFunction": "JSONPATHSTRING(after, '$.eway_bill_number')"
},
{
"columnName": "pod",
"transformFunction": "JSONPATHSTRING(after, '$.pod')"
},
{
"columnName": "frozen_weight",
"transformFunction": "JSONPATHDOUBLE(after, '$.frozen_weight')"
},
{
"columnName": "isd_code",
"transformFunction": "JSONPATHSTRING(after, '$.isd_code')"
},
{
"columnName": "seller_address",
"transformFunction": "JSONPATHSTRING(after, '$.seller_address')"
},
{
"columnName": "shipping_address",
"transformFunction": "JSONPATHSTRING(after, '$.shipping_address')"
},
{
"columnName": "customer_details",
"transformFunction": "JSONPATHSTRING(after, '$.customer_details')"
},
{
"columnName": "comment",
"transformFunction": "JSONPATHSTRING(after, '$.comment')"
},
{
"columnName": "others",
"transformFunction": "JSONPATHSTRING(after, '$.others')"
},
{
"columnName": "entry_tax",
"transformFunction": "JSONPATHDOUBLE(after, '$.entry_tax')"
},
{
"columnName": "cost",
"transformFunction": "JSONPATHDOUBLE(after, '$.cost')"
},
{
"columnName": "tax",
"transformFunction": "JSONPATHDOUBLE(after, '$.tax')"
},
{
"columnName": "cod_charges",
"transformFunction": "JSONPATHDOUBLE(after, '$.cod_charges')"
},
{
"columnName": "total",
"transformFunction": "JSONPATHDOUBLE(after, '$.total')"
},
{
"columnName": "invoice_date",
"transformFunction": "JSONPATHLONG(after, '$.invoice_date')"
},
{
"columnName": "awb_assign_date",
"transformFunction": "JSONPATHLONG(after, '$.awb_assign_date')"
},
{
"columnName": "pickup_generated_date",
"transformFunction": "JSONPATHLONG(after, '$.pickup_generated_date')"
},
{
"columnName": "pickup_scheduled_date",
"transformFunction": "JSONPATHLONG(after, '$.pickup_scheduled_date')"
},
{
"columnName": "out_for_pickup_date",
"transformFunction": "JSONPATHSTRING(after, '$.out_for_pickup_date')"
},
{
"columnName": "created_at",
"transformFunction": "JSONPATHSTRING(after, '$.created_at')"
},
{
"columnName": "updated_at",
"transformFunction": "JSONPATHSTRING(after, '$.updated_at')"
},
{
"columnName": "rto_initiated_date",
"transformFunction": "JSONPATHSTRING(after, '$.rto_initiated_date')"
},
{
"columnName": "rto_delivered_date",
"transformFunction": "JSONPATHSTRING(after, '$.rto_delivered_date')"
},
{
"columnName": "updated_on",
"transformFunction": "JSONPATHSTRING(after, '$.updated_on')"
},
{
"columnName": "etd",
"transformFunction": "JSONPATHLONG(after, '$.etd')"
},
{
"columnName": "promised_pickup_tat",
"transformFunction": "JSONPATHDOUBLE(after, '$.promised_pickup_tat')"
},
{
"columnName": "promised_delivery_tat",
"transformFunction": "JSONPATHDOUBLE(after, '$.promised_delivery_tat')"
},
{
"columnName": "promised_rto_tat",
"transformFunction": "JSONPATHDOUBLE(after, '$.promised_rto_tat')"
},
{
"columnName": "promised_cod_remittance_tat",
"transformFunction": "JSONPATHDOUBLE(after, '$.promised_cod_remittance_tat')"
},
{
"columnName": "shipped_date",
"transformFunction": "JSONPATHSTRING(after, '$.shipped_date')"
},
{
"columnName": "delivered_date",
"transformFunction": "JSONPATHSTRING(after, '$.delivered_date')"
},
{
"columnName": "returned_date",
"transformFunction": "JSONPATHSTRING(after, '$.returned_date')"
},
{
"columnName": "eway_bill_date",
"transformFunction": "JSONPATHSTRING(after, '$.returned_date')"
},
{
"columnName": "mps_data",
"transformFunction": "JSONPATHSTRING(after, '$.returned_date')"
}
],
and my schemaFile.json:
{
"schemaName": "shipments",
"enableColumnBasedNullHandling": true,
"dimensionFieldSpecs": [
{ "name": "id", "dataType": "LONG" },
{ "name": "order_id", "dataType": "LONG" },
{ "name": "company_id", "dataType": "LONG" },
{ "name": "channel_id", "dataType": "LONG" },
{ "name": "invoice_no", "dataType": "STRING" },
{ "name": "encrypt_invoice_name", "dataType": "STRING" },
{ "name": "courier", "dataType": "STRING" },
{ "name": "sr_courier_id", "dataType": "INT" },
{ "name": "code", "dataType": "STRING" },
{ "name": "awb", "dataType": "STRING" },
{ "name": "pickup_token_number", "dataType": "STRING" },
{ "name": "pickup_address_id", "dataType": "LONG" },
{ "name": "return_pickup_address_id", "dataType": "INT" },
{ "name": "dhl_handover_id", "dataType": "STRING" },
{ "name": "dhl_handover_url", "dataType": "STRING" },
{ "name": "pickup_reshedule_count", "dataType": "INT" },
{ "name": "dhl_pickup_url", "dataType": "STRING" },
{ "name": "method", "dataType": "STRING" },
{ "name": "channel_shipment_id", "dataType": "STRING" },
{ "name": "weight", "dataType": "STRING" },
{ "name": "dimensions", "dataType": "STRING" },
{ "name": "volumetric_weight", "dataType": "STRING" },
{ "name": "quantity", "dataType": "INT" },
{ "name": "status", "dataType": "INT" },
{ "name": "state_type", "dataType": "INT" },
{ "name": "sub_status", "dataType": "INT" },
{ "name": "status_code", "dataType": "STRING" },
{ "name": "shipment_zone", "dataType": "INT" },
{ "name": "label_url", "dataType": "STRING" },
{ "name": "manifest_url", "dataType": "STRING" },
{ "name": "is_locked", "dataType": "INT" },
{ "name": "customer_gstin", "dataType": "STRING" },
{ "name": "eway_bill_number", "dataType": "STRING" },
{ "name": "pod", "dataType": "STRING" },
{ "name": "frozen_weight", "dataType": "INT" },
{ "name": "isd_code", "dataType": "STRING" },
{ "name": "seller_address", "dataType": "STRING" },
{ "name": "shipping_address", "dataType": "STRING" },
{ "name": "customer_details", "dataType": "STRING" },
{ "name": "comment", "dataType": "STRING" },
{ "name": "others", "dataType": "STRING" },
{ "name": "mps_data", "dataType": "STRING" }
],
"metricFieldSpecs": [
{ "name": "entry_tax", "dataType": "DOUBLE", "defaultNullValue": null },
{ "name": "cost", "dataType": "DOUBLE", "defaultNullValue": null },
{ "name": "tax", "dataType": "DOUBLE", "defaultNullValue": null },
{ "name": "cod_charges", "dataType": "DOUBLE", "defaultNullValue": null },
{ "name": "total", "dataType": "DOUBLE", "defaultNullValue": null }
],
"dateTimeFieldSpecs": [
{ "name": "invoice_date", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" },
{ "name": "awb_assign_date", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" },
{ "name": "pickup_generated_date", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" },
{ "name": "pickup_scheduled_date", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" },
{ "name": "out_for_pickup_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
{ "name": "created_at", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
{ "name": "updated_at", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
{ "name": "rto_initiated_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
{ "name": "rto_delivered_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
{ "name": "updated_on", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
{ "name": "etd", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" },
{ "name": "promised_pickup_tat", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
{ "name": "promised_delivery_tat", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
{ "name": "promised_rto_tat", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
{ "name": "promised_cod_remittance_tat", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
{ "name": "shipped_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
{ "name": "delivered_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
{ "name": "returned_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" },
{ "name": "eway_bill_date", "dataType": "STRING", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:DAYS" }
]
}
Please let me know if you need anything to debug this issue.
I have tried making it into Double type data type and then using JSONPATHDOUBLE but the data that is coming to pinot is all null.
Upvotes: 0
Views: 14