Carlos Bernal
Carlos Bernal

Reputation: 11

Flink Source kafka Join with CDC source to kafka sink

We are trying to join from a DB-cdc connector (upsert behave) table. With a 'kafka' source of events to enrich this events by key with the existing cdc data. kafka-source (id, B, C) + cdc (id, D, E, F) = result(id, B, C, D, E, F) into a kafka sink (append)

INSERT INTO sink (zapatos, naranjas, device_id, account_id, user_id) 
SELECT zapatos, naranjas, source.device_id, account_id, user_id FROM source 
JOIN mongodb_source ON source.device_id = mongodb_source._id

The problem, this only works if our kafka sink is 'upsert-kafka'. But this created tombstones on deletion in DB. We need to just behave as plain events, not a changelog. but we cannot use just 'kafka' sink because db connector is upsert so is not compatible...

What would be the way to do this? Transform the upsert into just append events?

s_env = StreamExecutionEnvironment.get_execution_environment()
    s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    s_env.set_parallelism(1)
    # use blink table planner
    st_env = StreamTableEnvironment \
        .create(s_env, environment_settings=EnvironmentSettings
                .new_instance()
                .in_streaming_mode()
                .use_blink_planner().build())

   ddl = """CREATE TABLE sink (
            `zapatos` INT,
            `naranjas` STRING,
            `account_id` STRING,
            `user_id` STRING,
            `device_id` STRING,
            `time28` INT,
            PRIMARY KEY (device_id) NOT ENFORCED
        ) WITH (
            'connector' = 'upsert-kafka',
            'topic' = 'as-test-output-flink-topic',
            'properties.bootstrap.servers' = 'kafka:9092',
            'properties.group.id' = 'testGroup',
            'key.format' = 'raw',
            'value.format' = 'json',
            'value.fields-include' = 'EXCEPT_KEY'
        )
        """
    st_env.sql_update(ddl)
    
    ddl = """CREATE TABLE source (
            `device_id` STRING,
            `timestamp` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
            `event_type` STRING,
            `payload` ROW<`zapatos` INT, `naranjas` STRING, `time28` INT, `device_id` STRING>,
            `trace_id` STRING
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'as-test-input-flink-topic',
            'properties.bootstrap.servers' = 'kafka:9092',
            'properties.group.id' = 'testGroup',
            'key.format' = 'raw',
            'key.fields' = 'device_id',
            'value.format' = 'json',
            'value.fields-include' = 'EXCEPT_KEY'
        )
        """
    st_env.sql_update(ddl)
    
    ddl = """
    CREATE TABLE mongodb_source (
        `_id` STRING PRIMARY KEY, 
        `account_id` STRING,
        `user_id` STRING,
        `device_id` STRING
    ) WITH (
        'connector' = 'mongodb-cdc',
        'uri' = '******',
        'database' = '****',
        'collection' = 'testflink'
    )
    """
    st_env.sql_update(ddl)


    st_env.sql_update("""
        INSERT INTO sink (zapatos, naranjas, device_id, account_id, user_id) 
        SELECT zapatos, naranjas, source.device_id, account_id, user_id FROM source 
        JOIN mongodb_source ON source.device_id = mongodb_source._id
     """)

# execute
    st_env.execute("kafka_to_kafka")

Dont mind the Mongo-cdc connector, is new but works as the mysql-cdc or postgre-cdc.

Thanks for your help!

Upvotes: 1

Views: 676

Answers (1)

Hako
Hako

Reputation: 381

Have you tried to use LEFT JOIN instead of JOIN? It shouldn’t create tombstones then if your purpose is just enrichment of kafka events if there is any from mongo…

Upvotes: 1

Related Questions