Tom
Tom

Reputation: 6342

Different results when reading messages written in Kafka with upsert-kafka format

I am using following three test cases to test the behavior of upsert-kafka

  1. Write the aggregation results into kafka with upsert-kafka format (TestCase1)
  2. Using fink table result print to output the messages.(TestCase2)
  3. Consume the Kafka Messages directly with the consume-console.sh tool.(TestCase3)

I found that when using fink table result print, it prints two messages with -U and +U to indicate that one is deleted, and the other is inserted, and for the consume-console, it prints the result correctly and directly.

I would ask why fink table result print behaves what I have observed

Where does -U and +U (delete message and insert message) come from, are they saved in Kafka as two messages? I think the answer is NO, because I didn't see these immediate results. when consuming with consumer-console.

package org.example.official.sql

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.example.model.Stock
import org.example.sources.StockSource
import org.scalatest.funsuite.AnyFunSuite

class UpsertKafkaTest extends AnyFunSuite {
  val topic = "test-UpsertKafkaTest-1"
  //Test Case 1
  test("write to upsert kafka: upsert-kafka as sink") {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    val ds: DataStream[Stock] = env.addSource(new StockSource(emitInterval = 1500, print = false))
    ds.print()
    val tenv = StreamTableEnvironment.create(env)
    tenv.createTemporaryView("sourceTable", ds)
    val ddl =
      s"""
              CREATE TABLE sinkTable (
                id STRING,
                total_price DOUBLE,
                PRIMARY KEY (id) NOT ENFORCED
              ) WITH (
                'connector' = 'upsert-kafka',
                'topic' = '$topic',
                'properties.bootstrap.servers' = 'localhost:9092',
                'key.format' = 'json',
                'value.format' = 'json'
              )
      """.stripMargin(' ')

    tenv.executeSql(ddl)

    tenv.executeSql(
      """
      insert into sinkTable
      select id, sum(price)
      from sourceTable
      group by id

     """.stripMargin(' '))

    env.execute()
  }

//Test Case 2  
  test("read from upsert kafka: upsert-kafka as source 2") {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    val tenv = StreamTableEnvironment.create(env)
    val ddl =
      s"""
              CREATE TABLE sourceTable (
                id STRING,
                total_price DOUBLE,
                PRIMARY KEY (`id`) NOT ENFORCED
              ) WITH (
                'connector' = 'upsert-kafka',
                'topic' = '$topic',
                'properties.bootstrap.servers' = 'localhost:9092',
                'properties.group.id' = 'testGroup001',
                'key.format' = 'json',
                'key.json.ignore-parse-errors' = 'true',

                'value.format' = 'json',
                'value.json.fail-on-missing-field' = 'false',
                'value.fields-include' = 'EXCEPT_KEY'
              )
      """.stripMargin(' ')

    tenv.executeSql(ddl)

    val result = tenv.executeSql(
      """
              select *  from sourceTable

     """.stripMargin(' '))

    result.print()

    /*
    +----+--------------------------------+--------------------------------+
  | op |                             id |                    total_price |
  +----+--------------------------------+--------------------------------+
  | +I |                            id1 |                            1.0 |
  | -U |                            id1 |                            1.0 |
  | +U |                            id1 |                            3.0 |
  | -U |                            id1 |                            3.0 |
  | +U |                            id1 |                            6.0 |
  | -U |                            id1 |                            6.0 |
  | +U |                            id1 |                           10.0 |
  | -U |                            id1 |                           10.0 |
  | +U |                            id1 |                           15.0 |
  | -U |                            id1 |                           15.0 |
  | +U |                            id1 |                           21.0 |
  | -U |                            id1 |                           21.0 |
  | +U |                            id1 |                           28.0 |
  | -U |                            id1 |                           28.0 |
  | +U |                            id1 |                           36.0 |
  | -U |                            id1 |                           36.0 |
  | +U |                            id1 |                           45.0 |
     */


  }
  
//Test Case 3  
test("read from upsert kafka with consumer console") {


    /*
      kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-UpsertKafkaTest-1 --from-beginning
      {"id":"id1","total_price":1.0}
      {"id":"id1","total_price":3.0}
      {"id":"id1","total_price":6.0}
      {"id":"id1","total_price":10.0}
      {"id":"id1","total_price":15.0}
      {"id":"id1","total_price":21.0}
      {"id":"id1","total_price":28.0}
      {"id":"id1","total_price":36.0}
      {"id":"id1","total_price":45.0}
     */
  }

 


}


Upvotes: 1

Views: 475

Answers (1)

David Anderson
David Anderson

Reputation: 43707

With Flink SQL we speak of the duality between tables and streams -- that a stream can be thought of as a (dynamic) table, and vice versa. There are two types of streams/tables: appending and updating. An append stream corresponds to a dynamic table that only performs INSERT operations; nothing is ever deleted or updated. And an update stream corresponds to a dynamic table where rows can be updated and deleted.

Your source table is an upsert-kafka table, and as such, is an update table (not an appending table). An upsert-kafka source corresponds to a compacted topic, and when compactions occur, that leads to updates/retractions where the existing values for various keys are updated over time.

When an updating table is converted into a stream, there are two possible results: you either get an upsert stream or a retraction stream. Some sinks support one or the other of these types of update streams, and some support both.

What you are seeing is that the upsert-kafka sink can handle upserts, and the print sink cannot. So the same update table is being fed to Kafka as a stream of upsert (and possibly deletion) events, and it's being sent to stdout as a stream with an initial insert (+I) for each key, followed by update_before/update_after pairs encoded as -U +U for each update (and deletions, were any to occur).

Upvotes: 3

Related Questions