AaronLi
AaronLi

Reputation: 23

Is it possible to sink kafka message generated by debezium to snowflake

I use debezium-ui repo to testing debezium mysql cdc feature, the message can normally stream
into kafka, the request body of to create mysql connect are as follows:

    {
      "name": "inventory-connector",  
      "config": {  
          "connector.class": "io.debezium.connector.mysql.MySqlConnector",
          "tasks.max": "1",  
          "database.hostname": "dbzui-db-mysql",  
          "database.port": "3306",
          "database.user": "mysqluser",
          "database.password": "mysql",
          "database.server.id": "184054",  
          "database.server.name": "inventory-connector-mysql",  
          "database.include.list": "inventory",  
          "database.history.kafka.bootstrap.servers": "dbzui-kafka:9092",  
          "database.history.kafka.topic": "dbhistory.inventory"  
      }
    }

And then I need to sink the kafka message into snowflake, the data warehouse my team use. I create a snowflake sink connector to sink it, the request body are as follows:

{
    "name": "kafka2-04",
    "config": {
        "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
        "tasks.max": 1,
        "topics": "inventory-connector-mysql.inventory.orders",
        "snowflake.topic2table.map": "inventory-connector-mysql.inventory.orders:tbl_orders",
        "snowflake.url.name": "**.snowflakecomputing.com",
        "snowflake.user.name": "kafka_connector_user_1",
        "snowflake.private.key": "*******",
        "snowflake.private.key.passphrase": "",
        "snowflake.database.name": "kafka_db",
        "snowflake.schema.name": "kafka_schema",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "com.snowflake.kafka.connector.records.SnowflakeJsonConverter",
        "header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter",
        "value.converter.schemas.enable":"true"
    }
}

But after it runs,the data sink into my snowflake is like: data in snowflake, the schema in snowflake table is different from mysql table. Is my sink connector config is incorrect or it is impossible to sink kafka data generated by debezium with SnowflakeSinkConnector.

Upvotes: 0

Views: 798

Answers (1)

Sergiu
Sergiu

Reputation: 4598

This is default behavior in Snowflake and it is documented here:

Every Snowflake table loaded by the Kafka connector has a schema consisting of two VARIANT columns:

RECORD_CONTENT. This contains the Kafka message.

RECORD_METADATA. This contains metadata about the message, for example, the topic from which the message was read.

If Snowflake creates the table, then the table contains only these two columns. If the user creates the table for the Kafka Connector to add rows to, then the table can contain more than these two columns (any additional columns must allow NULL values because data from the connector does not include values for those columns).

Upvotes: 1

Related Questions