DebianUser
DebianUser

Reputation: 23

The stream created in ksqlDB shows NULL value

I am trying to create a stream in ksqlDB to get the data from the kafka topic and perform query on it.

CREATE STREAM test_location (
  id VARCHAR,
  name VARCHAR,
  location VARCHAR
  )

 WITH (KAFKA_TOPIC='public.location',
       VALUE_FORMAT='JSON',
       PARTITIONS=10);

The data in the topics public.location is in JSON format.

UPDATED topic message.

print 'public.location' from beginning limit 1;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2021/05/23 11:27:39.429 Z, key: <null>, value: {"sourceTable":{"id":"1","name":Sam,"location":Manchester,"ConnectorVersion":null,"connectorId":null,"ConnectorName":null,"DbName":null,"DbSchema":null,"TableName":null,"payload":null,"schema":null},"ConnectorVersion":null,"connectorId":null,"ConnectorName":null,"DbName":null,"DbSchema":null,"TableName":null,"payload":null,"schema":null}, partition: 3

After the stream is created, and performing SELECT on the created stream I get NULL in the output. Although the topic has the data.

select * from test_location
>EMIT CHANGES limit 5;
+-----------------------------------------------------------------+-----------------------------------------------------------------+-----------------------------------------------------------------+
|ID                                                               |NAME                                                            |LOCATION                                                          |
+-----------------------------------------------------------------+-----------------------------------------------------------------+-----------------------------------------------------------------+
|null                                                             |null                                                             |null                                                             |
|null                                                             |null                                                             |null                                                             |
|null                                                             |null                                                             |null                                                             |
|null                                                             |null                                                             |null                                                             |
|null                                                             |null                                                             |null                                                             |
Limit Reached
Query terminated

Here is the details from docker file

version: '2'

services:

  ksqldb-server:
    image: confluentinc/ksqldb-server:0.18.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - schema-registry
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      # Configuration to embed Kafka Connect support.
      KSQL_CONNECT_GROUP_ID: "ksql-connect-01"
      KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      KSQL_CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "_ksql-connect-01-configs"
      KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "_ksql-connect-01-offsets"
      KSQL_CONNECT_STATUS_STORAGE_TOPIC: "_ksql-connect-01-statuses"
      KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins"

Update: Here is a message in the topic that I see in the Kafka

{
   "sourceTable": {
      "id": "1",
      "name": Sam,
      "location": Manchester,
      "ConnectorVersion": null,
      "connectorId": null,
      "ConnectorName": null,
      "DbName": null,
      "DbSchema": null,
      "TableName": null,
      "payload": null,
      "schema": null
   },
   "ConnectorVersion": null,
   "connectorId": null,
   "ConnectorName": null,
   "DbName": null,
   "DbSchema": null,
   "TableName": null,
   "payload": null,
   "schema": null
}

Which step or configuration I am missing?

Upvotes: 2

Views: 2129

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62285

Given your payload, you would need to declare the schema nested, because id, name, and location are not "top level" fields in the Json, but they are nested within sourceTable.

CREATE STREAM est_location (
  sourceTable STRUCT<id VARCHAR, name VARCHAR, location VARCHAR>
)

It's not possible to "unwrap" the data when defining the schema, but the schema must match what is in the topic. In addition to sourceTable you could also add ConnectorVersion etc to the schema, as they are also "top level" fields in you JSON. Bottom line is, that column in ksqlDB can only be declared on top level field. Everything else is nested data that you can access using STRUCT type.

Of course later, when you query est_location you can refer to individual fields via sourceTable->id etc.

It would also be possible to declare a derived STREAM if you want to unnest the schema:

CREATE STREAM unnested_est_location AS
  SELECT sourceTable->id AS id,
         sourceTable->name AS name,
         sourceTable->location AS location
  FROM est_location;

Of course, this would write the data into a new topic.

Upvotes: 3

Related Questions