Reputation: 23
I am trying to create a stream in ksqlDB to get the data from the kafka topic and perform query on it.
CREATE STREAM test_location (
id VARCHAR,
name VARCHAR,
location VARCHAR
)
WITH (KAFKA_TOPIC='public.location',
VALUE_FORMAT='JSON',
PARTITIONS=10);
The data in the topics public.location is in JSON format.
UPDATED topic message.
print 'public.location' from beginning limit 1;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2021/05/23 11:27:39.429 Z, key: <null>, value: {"sourceTable":{"id":"1","name":Sam,"location":Manchester,"ConnectorVersion":null,"connectorId":null,"ConnectorName":null,"DbName":null,"DbSchema":null,"TableName":null,"payload":null,"schema":null},"ConnectorVersion":null,"connectorId":null,"ConnectorName":null,"DbName":null,"DbSchema":null,"TableName":null,"payload":null,"schema":null}, partition: 3
After the stream is created, and performing SELECT on the created stream I get NULL in the output. Although the topic has the data.
select * from test_location
>EMIT CHANGES limit 5;
+-----------------------------------------------------------------+-----------------------------------------------------------------+-----------------------------------------------------------------+
|ID |NAME |LOCATION |
+-----------------------------------------------------------------+-----------------------------------------------------------------+-----------------------------------------------------------------+
|null |null |null |
|null |null |null |
|null |null |null |
|null |null |null |
|null |null |null |
Limit Reached
Query terminated
Here is the details from docker file
version: '2'
services:
ksqldb-server:
image: confluentinc/ksqldb-server:0.18.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- schema-registry
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
# Configuration to embed Kafka Connect support.
KSQL_CONNECT_GROUP_ID: "ksql-connect-01"
KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
KSQL_CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "_ksql-connect-01-configs"
KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "_ksql-connect-01-offsets"
KSQL_CONNECT_STATUS_STORAGE_TOPIC: "_ksql-connect-01-statuses"
KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins"
Update: Here is a message in the topic that I see in the Kafka
{
"sourceTable": {
"id": "1",
"name": Sam,
"location": Manchester,
"ConnectorVersion": null,
"connectorId": null,
"ConnectorName": null,
"DbName": null,
"DbSchema": null,
"TableName": null,
"payload": null,
"schema": null
},
"ConnectorVersion": null,
"connectorId": null,
"ConnectorName": null,
"DbName": null,
"DbSchema": null,
"TableName": null,
"payload": null,
"schema": null
}
Which step or configuration I am missing?
Upvotes: 2
Views: 2129
Reputation: 62285
Given your payload, you would need to declare the schema nested, because id
, name
, and location
are not "top level" fields in the Json, but they are nested within sourceTable
.
CREATE STREAM est_location (
sourceTable STRUCT<id VARCHAR, name VARCHAR, location VARCHAR>
)
It's not possible to "unwrap" the data when defining the schema, but the schema must match what is in the topic. In addition to sourceTable
you could also add ConnectorVersion
etc to the schema, as they are also "top level" fields in you JSON. Bottom line is, that column in ksqlDB can only be declared on top level field. Everything else is nested data that you can access using STRUCT
type.
Of course later, when you query est_location
you can refer to individual fields via sourceTable->id
etc.
It would also be possible to declare a derived STREAM if you want to unnest the schema:
CREATE STREAM unnested_est_location AS
SELECT sourceTable->id AS id,
sourceTable->name AS name,
sourceTable->location AS location
FROM est_location;
Of course, this would write the data into a new topic.
Upvotes: 3