ciolo
ciolo

Reputation: 147

How to rename/transform nested fields in json object using ElasticsearchSinkConnector

I'm using ElasticsearchSinkConnector to store data from Kafka topics to Elasticsearch indices. This is an example of a Kafka message:

{"ID" : "7d6203f4-3ae7-4daa-af03-71f98d619f7e",
 "Timestamp" : "2020-11-02T12:05:57.87639003Z",
 "Type" : "CREATION",
 "PlaceType" : "home",
 "Location" : {
        "Lat" : 43.7575119,
        "Lon" : 11.2921363
      },
"Created" : "2020-11-02T12:05:57.876390266Z",
"LastUpdated" : "2020-11-02T12:05:57.876390398Z"}

I would like to represent the Location object as geo_point in ES but Lat/Lon must be lowercase to be geo_point objects. I am using ReplaceField$Value to rename Location to "location" but I cannot rename the nested field, Lat/Lon. This is my snippet for renaming Location, Lat and Lon:

transforms: 'RenameField'
transforms.RenameField.type: org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.RenameField.renames: 'Location:location,location.Lat:lat,location.Lon:lon'

Location works, but Lat/Lon doesn't. In brief, I'd like to have the following result in ES:

{"ID" : "7d6203f4-3ae7-4daa-af03-71f98d619f7e",
 "Timestamp" : "2020-11-02T12:05:57.87639003Z",
 "Type" : "CREATION",
 "PlaceType" : "home",
 "location" : {
        "lat" : 43.7575119,
        "lon" : 11.2921363
      },
"Created" : "2020-11-02T12:05:57.876390266Z",
"LastUpdated" : "2020-11-02T12:05:57.876390398Z"}

UPDATE

Awesome, thank you very much. There was a problem creating my target stream in ksql-cli.

ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
>    SELECT *,
>        STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS "location"
>    FROM PLACES_EVENT;
Can't find any functions with the name 'STRUCT'
ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
>    SELECT *,
>        STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS 'location'
>    FROM PLACES_EVENT;
line 3:64: mismatched input ''location'' expecting {'NO', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'REPLACE', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'SHOW', 'TABLES', 'SCHEMAS', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'IF', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Caused by: org.antlr.v4.runtime.InputMismatchException

I tried to set the STRUCT name without quotes but ksql throws me an error like the first one.

ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
>    SELECT *,
>        STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS GeoPointLocation
>    FROM PLACES_EVENT;
Can't find any functions with the name 'STRUCT'

Can you help me?

Upvotes: 1

Views: 1966

Answers (1)

Robin Moffatt
Robin Moffatt

Reputation: 32090

I hit this exact same problem - and I'm not aware of an existing Single Message Transform that can help. You have a couple of options:

  1. Write your own Single Message Transform to do this

  2. Use ksqlDB to wrangle the schema, which is the route I chose

     CREATE STREAM OUTPUT_STREAM AS
         SELECT *,
         STRUCT("lat" := LATITUDE, "lon":= LONGITUDE) AS "location"
       FROM SOURCE_STREAM
       EMIT CHANGES;
    

You will also want to create a mapping template to prepare the Elasticsearch index if you haven't already


To expand on the ksqlDB example:

  1. Populate source topic with the sample data:

    kafkacat -b localhost:9092 -P -t input_topic <<EOF
    { "ID": "7d6203f4-3ae7-4daa-af03-71f98d619f7e", "Timestamp": "2020-11-02T12:05:57.87639003Z", "Type": "CREATION", "PlaceType": "home", "Location": { "Lat": 43.7575119, "Lon": 11.2921363 }, "Created": "2020-11-02T12:05:57.876390266Z", "LastUpdated": "2020-11-02T12:05:57.876390398Z" }
    EOF
    
  2. Taking a source topic of source, declare the ksqlDB STREAM object (which is basically Kafka topic with a schema overlaid):

    CREATE STREAM SOURCE_STREAM (ID VARCHAR,
                                Timestamp VARCHAR,
                                Type VARCHAR,
                                PlaceType VARCHAR,
                                Location STRUCT<Lat DOUBLE, Lon DOUBLE>,
                                Created VARCHAR,
                                LastUpdated VARCHAR)
            WITH (KAFKA_TOPIC='input_topic', 
                VALUE_FORMAT='JSON');
    
  3. Confirm that the stream's schema is valid by selecting fields from the first message:

    ksql> SET 'auto.offset.reset' = 'earliest';
    >
    Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
    
    ksql> SELECT ID, PLACETYPE, LOCATION->LAT, LOCATION->LON FROM SOURCE_STREAM EMIT CHANGES LIMIT 1;
    +---------------------------------------+----------+-----------+-----------+
    |ID                                     |PLACETYPE |LAT        |LON        |
    +---------------------------------------+----------+-----------+-----------+
    |7d6203f4-3ae7-4daa-af03-71f98d619f7e   |home      |43.7575119 |11.2921363 |
    Limit Reached
    Query terminated
    
  4. Create a target stream, mapping the lat/lon fields to lower-case names. Here I'm also showing the alternative approach of concatenating them, which Elasticsearch will also accept:

    CREATE STREAM TARGET_STREAM WITH (KAFKA_TOPIC='target_topic') AS
        SELECT *, 
            STRUCT("lat" := LOCATION->LAT, "lon":= LOCATION->LON) AS "location_example_01",
            CAST(LOCATION->LAT AS VARCHAR)  + ',' + CAST(LOCATION->LON AS VARCHAR) AS "location_example_02"
        FROM SOURCE_STREAM;
    
  5. Create an index template for Elasticsearch if the index does not already have the geo_point mapping declared. Here it'll match any index created that begins with target

    curl --silent --show-error -XPUT -H 'Content-Type: application/json' \
        http://localhost:9200/_index_template/rmoff_template01/ \
        -d'{
            "index_patterns": [ "target*" ],
            "template": {
                "mappings": {
                    "properties": {
                        "location_example_01": {
                            "type": "geo_point"
                        },
                        "location_example_02": {
                            "type": "geo_point"
                        }
                    }
                }
            } }'
    
  6. Stream the data from Kafka to Elasticsearch using Kafka Connect. You can do configure this using the native Kafka Connect REST API, or do it directly from ksqlDB itself:

    CREATE SINK CONNECTOR SINK_ELASTIC_01 WITH (
    'connector.class'                     = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
    'topics'                              = 'target_topic',
    'key.converter'                       = 'org.apache.kafka.connect.storage.StringConverter',
    'value.converter'                     = 'org.apache.kafka.connect.json.JsonConverter',
    'value.converter.schemas.enable'      = 'false',
    'connection.url'                      = 'http://elasticsearch:9200',
    'type.name'                           = '_doc',
    'key.ignore'                          = 'true',
    'schema.ignore'                       = 'true');
    
  7. Inspect the mappings in the new Elasticsearch index

    curl -XGET --silent --show-error http://localhost:9200"/target_topic/_mappings"  | jq '.'
    {
    "target_topic": {
        "mappings": {
        "properties": {
            "CREATED": {
            "type": "date"
            },
            "ID": {
            "type": "text",
            "fields": {
                "keyword": {
                "type": "keyword",
                "ignore_above": 256
                }
            }
            },
            "LASTUPDATED": {
            "type": "date"
            },
            "LOCATION": {
            "properties": {
                "LAT": {
                "type": "float"
                },
                "LON": {
                "type": "float"
                }
            }
            },
            "PLACETYPE": {
            "type": "text",
            "fields": {
                "keyword": {
                "type": "keyword",
                "ignore_above": 256
                }
            }
            },
            "TIMESTAMP": {
            "type": "date"
            },
            "TYPE": {
            "type": "text",
            "fields": {
                "keyword": {
                "type": "keyword",
                "ignore_above": 256
                }
            }
            },
            "location_example_01": {
            "type": "geo_point"
            },
            "location_example_02": {
            "type": "geo_point"
            }
        }
        }
    }
    }
    
  8. View the data

    enter image description here

    enter image description here

Upvotes: 3

Related Questions