Reputation: 539
I did configure a connection to database and all data transfer over the topic becaus when i run the consumer it return data
How can i transform this topic to table and persist the data inside KSQL?
thanks very much
Upvotes: 0
Views: 1137
Reputation: 32110
You don't persist data in KSQL. KSQL is simply an engine for querying and transforming data in Kafka. The source for KSQL queries is Kafka topic(s), and the output of KSQL queries is either interactive, or back out to another kafka topic.
If you have data in your Kafka topics—which it sounds like you have—then in KSQL run LIST TOPICS;
:
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
_confluent-metrics | false | 12 | 1 | 0 | 0
asgard.demo.accounts | false | 1 | 1 | 0 | 0
To see your Kafka topics. From there, pick your topic, and you can run PRINT 'my-topic' FROM BEGINNING;
ksql> PRINT 'asgard.demo.accounts' FROM BEGINNING;
Format:AVRO
10/11/18 9:24:45 AM UTC, null, {"account_id": "a42", "first_name": "Robin", "last_name": "Moffatt", "email": "[email protected]", "phone": "+44 123 456 789", "address": "22 Acacia Avenue", "country": "United Kingdom", "create_ts": "2018-10-11T09:23:22Z", "update_ts": "2018-10-11T09:23:22Z", "messagetopic": "asgard.demo.accounts", "messagesource": "Debezium CDC from MySQL on asgard"}
10/11/18 9:24:45 AM UTC, null, {"account_id": "a081", "first_name": "Sidoney", "last_name": "Lafranconi", "email": "[email protected]", "phone": "+44 908 687 6649", "address": "40 Kensington Pass", "country": "United Kingdom", "create_ts": "2018-10-11T09:23:22Z", "update_ts": "2018-10-11T09:23:22Z", "messagetopic": "asgard.demo.accounts", "messagesource": "Debezium CDC from MySQL on asgard"}
10/11/18 9:24:45 AM UTC, null, {"account_id": "a135", "first_name": "Mick", "last_name": "Edinburgh", "email": "[email protected]", "phone": "+44 301 837 6535", "address": "27 Blackbird Lane", "country": "United Kingdom", "create_ts": "2018-10-11T09:23:22Z", "update_ts": "2018-10-11T09:23:22Z", "messagetopic": "asgard.demo.accounts", "messagesource": "Debezium CDC from MySQL on asgard"}
to see the contents of it. Press Ctrl-C to cancel the PRINT
statement and return to the command line.
Note the Format
on the output of the PRINT
statement. This is the serialisation format of your data.
If the data's serialised in Avro then you can run:
CREATE STREAM mydata WITH (KAFKA_TOPIC='asgard.demo.accounts', VALUE_FORMAT='AVRO');
If it's in JSON you'll need to also specify the column names and datatypes
CREATE STREAM mydata (col1 INT, col2 VARCHAR) WITH (KAFKA_TOPIC='asgard.demo.accounts', VALUE_FORMAT='JSON');
Now that you've 'registered' this topic with KSQL, you can view its schema with DESCRIBE
:
ksql> DESCRIBE mydata;
Name : MYDATA
Field | Type
-------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
ACCOUNT_ID | VARCHAR(STRING)
FIRST_NAME | VARCHAR(STRING)
LAST_NAME | VARCHAR(STRING)
EMAIL | VARCHAR(STRING)
PHONE | VARCHAR(STRING)
ADDRESS | VARCHAR(STRING)
COUNTRY | VARCHAR(STRING)
CREATE_TS | VARCHAR(STRING)
UPDATE_TS | VARCHAR(STRING)
MESSAGETOPIC | VARCHAR(STRING)
MESSAGESOURCE | VARCHAR(STRING)
-------------------------------------------
and then use KSQL to query and manipulate the data:
ksql> SET 'auto.offset.reset'='earliest';
ksql> SELECT FIRST_NAME + ' ' + LAST_NAME AS FULL_NAME, EMAIL FROM mydata WHERE COUNTRY='United Kingdom';
Robin Moffatt | [email protected]
Sidoney Lafranconi | [email protected]
Mick Edinburgh | [email protected]
Merrill Stroobant | [email protected]
Press Ctrl-C to cancel a SELECT
query.
KSQL can persist this to a new Kafka topic:
CREATE STREAM UK_USERS AS SELECT FIRST_NAME + ' ' + LAST_NAME AS FULL_NAME, EMAIL FROM mydata WHERE COUNTRY='United Kingdom';
If you list your KSQL topics again, you'll see the new one created and populated:
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
_confluent-metrics | false | 12 | 1 | 0 | 0
asgard.demo.accounts | true | 1 | 1 | 2 | 2
UK_USERS | true | 4 | 1 | 0 | 0
---------------------------------------------------------------------------------------------------------
ksql>
Every event coming into the source topic (asgard.demo.accounts
) gets read and filtered by KSQL and written to the target topic (UK_USERS
) based on the SQL you've executed.
For more info see the KSQL syntax docs, and tutorials.
Disclaimer: I work for Confluent, the company behind the open-source KSQL project.
Upvotes: 4