Reputation: 6342
I am using following three test cases to test the behavior of upsert-kafka
upsert-kafka
format (TestCase1)fink table result print
to output the messages.(TestCase2)I found that when using fink table result print
, it prints two messages with -U
and +U
to indicate that one is deleted, and the other is inserted, and for the consume-console
, it prints the result correctly and directly.
I would ask why fink table result print
behaves what I have observed
Where does -U
and +U
(delete message and insert message) come from, are they saved in Kafka as two messages? I think the answer is NO, because I didn't see these immediate results.
when consuming with consumer-console
.
package org.example.official.sql
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.example.model.Stock
import org.example.sources.StockSource
import org.scalatest.funsuite.AnyFunSuite
class UpsertKafkaTest extends AnyFunSuite {
val topic = "test-UpsertKafkaTest-1"
//Test Case 1
test("write to upsert kafka: upsert-kafka as sink") {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val ds: DataStream[Stock] = env.addSource(new StockSource(emitInterval = 1500, print = false))
ds.print()
val tenv = StreamTableEnvironment.create(env)
tenv.createTemporaryView("sourceTable", ds)
val ddl =
s"""
CREATE TABLE sinkTable (
id STRING,
total_price DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = '$topic',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'value.format' = 'json'
)
""".stripMargin(' ')
tenv.executeSql(ddl)
tenv.executeSql(
"""
insert into sinkTable
select id, sum(price)
from sourceTable
group by id
""".stripMargin(' '))
env.execute()
}
//Test Case 2
test("read from upsert kafka: upsert-kafka as source 2") {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val tenv = StreamTableEnvironment.create(env)
val ddl =
s"""
CREATE TABLE sourceTable (
id STRING,
total_price DOUBLE,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = '$topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup001',
'key.format' = 'json',
'key.json.ignore-parse-errors' = 'true',
'value.format' = 'json',
'value.json.fail-on-missing-field' = 'false',
'value.fields-include' = 'EXCEPT_KEY'
)
""".stripMargin(' ')
tenv.executeSql(ddl)
val result = tenv.executeSql(
"""
select * from sourceTable
""".stripMargin(' '))
result.print()
/*
+----+--------------------------------+--------------------------------+
| op | id | total_price |
+----+--------------------------------+--------------------------------+
| +I | id1 | 1.0 |
| -U | id1 | 1.0 |
| +U | id1 | 3.0 |
| -U | id1 | 3.0 |
| +U | id1 | 6.0 |
| -U | id1 | 6.0 |
| +U | id1 | 10.0 |
| -U | id1 | 10.0 |
| +U | id1 | 15.0 |
| -U | id1 | 15.0 |
| +U | id1 | 21.0 |
| -U | id1 | 21.0 |
| +U | id1 | 28.0 |
| -U | id1 | 28.0 |
| +U | id1 | 36.0 |
| -U | id1 | 36.0 |
| +U | id1 | 45.0 |
*/
}
//Test Case 3
test("read from upsert kafka with consumer console") {
/*
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-UpsertKafkaTest-1 --from-beginning
{"id":"id1","total_price":1.0}
{"id":"id1","total_price":3.0}
{"id":"id1","total_price":6.0}
{"id":"id1","total_price":10.0}
{"id":"id1","total_price":15.0}
{"id":"id1","total_price":21.0}
{"id":"id1","total_price":28.0}
{"id":"id1","total_price":36.0}
{"id":"id1","total_price":45.0}
*/
}
}
Upvotes: 1
Views: 475
Reputation: 43707
With Flink SQL we speak of the duality between tables and streams -- that a stream can be thought of as a (dynamic) table, and vice versa. There are two types of streams/tables: appending and updating. An append stream corresponds to a dynamic table that only performs INSERT operations; nothing is ever deleted or updated. And an update stream corresponds to a dynamic table where rows can be updated and deleted.
Your source table is an upsert-kafka table, and as such, is an update table (not an appending table). An upsert-kafka source corresponds to a compacted topic, and when compactions occur, that leads to updates/retractions where the existing values for various keys are updated over time.
When an updating table is converted into a stream, there are two possible results: you either get an upsert stream or a retraction stream. Some sinks support one or the other of these types of update streams, and some support both.
What you are seeing is that the upsert-kafka sink can handle upserts, and the print sink cannot. So the same update table is being fed to Kafka as a stream of upsert (and possibly deletion) events, and it's being sent to stdout as a stream with an initial insert (+I) for each key, followed by update_before/update_after pairs encoded as -U +U for each update (and deletions, were any to occur).
Upvotes: 3