Reputation: 147
I'm using ElasticsearchSinkConnector to store data from Kafka topics to Elasticsearch indices. This is an example of a Kafka message:
{"ID" : "7d6203f4-3ae7-4daa-af03-71f98d619f7e",
"Timestamp" : "2020-11-02T12:05:57.87639003Z",
"Type" : "CREATION",
"PlaceType" : "home",
"Location" : {
"Lat" : 43.7575119,
"Lon" : 11.2921363
},
"Created" : "2020-11-02T12:05:57.876390266Z",
"LastUpdated" : "2020-11-02T12:05:57.876390398Z"}
I would like to represent the Location
object as geo_point in ES but Lat/Lon must be lowercase to be geo_point objects. I am using ReplaceField$Value
to rename Location to "location" but I cannot rename the nested field, Lat/Lon. This is my snippet for renaming Location, Lat and Lon:
transforms: 'RenameField'
transforms.RenameField.type: org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.RenameField.renames: 'Location:location,location.Lat:lat,location.Lon:lon'
Location works, but Lat/Lon doesn't. In brief, I'd like to have the following result in ES:
{"ID" : "7d6203f4-3ae7-4daa-af03-71f98d619f7e",
"Timestamp" : "2020-11-02T12:05:57.87639003Z",
"Type" : "CREATION",
"PlaceType" : "home",
"location" : {
"lat" : 43.7575119,
"lon" : 11.2921363
},
"Created" : "2020-11-02T12:05:57.876390266Z",
"LastUpdated" : "2020-11-02T12:05:57.876390398Z"}
UPDATE
Awesome, thank you very much. There was a problem creating my target stream in ksql-cli.
ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
> SELECT *,
> STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS "location"
> FROM PLACES_EVENT;
Can't find any functions with the name 'STRUCT'
ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
> SELECT *,
> STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS 'location'
> FROM PLACES_EVENT;
line 3:64: mismatched input ''location'' expecting {'NO', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'REPLACE', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'SHOW', 'TABLES', 'SCHEMAS', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'IF', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Caused by: org.antlr.v4.runtime.InputMismatchException
I tried to set the STRUCT name without quotes but ksql throws me an error like the first one.
ksql> CREATE STREAM ES_PLACES_EVENT WITH (KAFKA_TOPIC='es-places-event') AS
> SELECT *,
> STRUCT('lat' = LOCATION->LAT, 'lon'= LOCATION->LON) AS GeoPointLocation
> FROM PLACES_EVENT;
Can't find any functions with the name 'STRUCT'
Can you help me?
Upvotes: 1
Views: 1966
Reputation: 32090
I hit this exact same problem - and I'm not aware of an existing Single Message Transform that can help. You have a couple of options:
Write your own Single Message Transform to do this
Use ksqlDB to wrangle the schema, which is the route I chose
CREATE STREAM OUTPUT_STREAM AS
SELECT *,
STRUCT("lat" := LATITUDE, "lon":= LONGITUDE) AS "location"
FROM SOURCE_STREAM
EMIT CHANGES;
You will also want to create a mapping template to prepare the Elasticsearch index if you haven't already
To expand on the ksqlDB example:
Populate source topic with the sample data:
kafkacat -b localhost:9092 -P -t input_topic <<EOF
{ "ID": "7d6203f4-3ae7-4daa-af03-71f98d619f7e", "Timestamp": "2020-11-02T12:05:57.87639003Z", "Type": "CREATION", "PlaceType": "home", "Location": { "Lat": 43.7575119, "Lon": 11.2921363 }, "Created": "2020-11-02T12:05:57.876390266Z", "LastUpdated": "2020-11-02T12:05:57.876390398Z" }
EOF
Taking a source topic of source
, declare the ksqlDB STREAM
object (which is basically Kafka topic with a schema overlaid):
CREATE STREAM SOURCE_STREAM (ID VARCHAR,
Timestamp VARCHAR,
Type VARCHAR,
PlaceType VARCHAR,
Location STRUCT<Lat DOUBLE, Lon DOUBLE>,
Created VARCHAR,
LastUpdated VARCHAR)
WITH (KAFKA_TOPIC='input_topic',
VALUE_FORMAT='JSON');
Confirm that the stream's schema is valid by selecting fields from the first message:
ksql> SET 'auto.offset.reset' = 'earliest';
>
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT ID, PLACETYPE, LOCATION->LAT, LOCATION->LON FROM SOURCE_STREAM EMIT CHANGES LIMIT 1;
+---------------------------------------+----------+-----------+-----------+
|ID |PLACETYPE |LAT |LON |
+---------------------------------------+----------+-----------+-----------+
|7d6203f4-3ae7-4daa-af03-71f98d619f7e |home |43.7575119 |11.2921363 |
Limit Reached
Query terminated
Create a target stream, mapping the lat/lon fields to lower-case names. Here I'm also showing the alternative approach of concatenating them, which Elasticsearch will also accept:
CREATE STREAM TARGET_STREAM WITH (KAFKA_TOPIC='target_topic') AS
SELECT *,
STRUCT("lat" := LOCATION->LAT, "lon":= LOCATION->LON) AS "location_example_01",
CAST(LOCATION->LAT AS VARCHAR) + ',' + CAST(LOCATION->LON AS VARCHAR) AS "location_example_02"
FROM SOURCE_STREAM;
Create an index template for Elasticsearch if the index does not already have the geo_point
mapping declared. Here it'll match any index created that begins with target
curl --silent --show-error -XPUT -H 'Content-Type: application/json' \
http://localhost:9200/_index_template/rmoff_template01/ \
-d'{
"index_patterns": [ "target*" ],
"template": {
"mappings": {
"properties": {
"location_example_01": {
"type": "geo_point"
},
"location_example_02": {
"type": "geo_point"
}
}
}
} }'
Stream the data from Kafka to Elasticsearch using Kafka Connect. You can do configure this using the native Kafka Connect REST API, or do it directly from ksqlDB itself:
CREATE SINK CONNECTOR SINK_ELASTIC_01 WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'topics' = 'target_topic',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable' = 'false',
'connection.url' = 'http://elasticsearch:9200',
'type.name' = '_doc',
'key.ignore' = 'true',
'schema.ignore' = 'true');
Inspect the mappings in the new Elasticsearch index
curl -XGET --silent --show-error http://localhost:9200"/target_topic/_mappings" | jq '.'
{
"target_topic": {
"mappings": {
"properties": {
"CREATED": {
"type": "date"
},
"ID": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"LASTUPDATED": {
"type": "date"
},
"LOCATION": {
"properties": {
"LAT": {
"type": "float"
},
"LON": {
"type": "float"
}
}
},
"PLACETYPE": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"TIMESTAMP": {
"type": "date"
},
"TYPE": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"location_example_01": {
"type": "geo_point"
},
"location_example_02": {
"type": "geo_point"
}
}
}
}
}
View the data
Upvotes: 3