
Reputation: 691

Flink multi source with kafka, kinesis and TableEnvironment

I'm new to Flink and hope someone can help. I have tried to follow Flink tutorials.

We have a requirement where we consume from:

  1. kafka topic.

When an event arrives on kafka topic we need the json event fields (mobile_acc_id, member_id, mobile_number, valid_from, valid_to) to be stored in an external db (Postgres db)

  1. kinesis stream.

When an event arrives on kinesis stream we need to look up the mobile_number, on the event, in Postgres DB (from step 1) and extract the "member_id" from db and enrich the incoming kinesis event and sink it to another output stream

So I set up a Stream and a Table environment like this:

public static StreamExecutionEnvironment initEnv() {
    var env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().setAutoWatermarkInterval(0L); //disables watermark
    return env;

public static TableEnvironment initTableEnv() {
    var settings = EnvironmentSettings.newInstance().inStreamingMode().build();
    return TableEnvironment.create(settings);

calling process(..) methods with initEnv() will use kinesis as the source!

process(config.getTransformerConfig(), input, sink, deadLetterSink, initEnv());

In the process(..) am also initialising the Table Environment using initTableEnv() hoping that Flink with consume from both sources when I call env.execute(..):

public static void process(TransformerConfig cfg, SourceFunction<String> source, SinkFunction<UsageSummaryWithHeader> sink,
                           SinkFunction<DeadLetterEvent> deadLetterSink, StreamExecutionEnvironment env) throws Exception {
    var events =
            StreamUtils.source(source, env, "kinesis-events", cfg.getInputParallelism());

    collectInSink(transform(cfg, events, deadLetterSink), sink, "kinesis-summary-events", cfg.getOutputParallelism());



private static void processStreamIntoTable(TableEnvironment tableEnv) throws Exception {
    tableEnv.executeSql("CREATE TABLE mobile_accounts (\n" +
            "    mobile_acc_id VARCHAR(36)              NOT NULL,\n" +
            "    member_id     BIGINT                   NOT NULL,\n" +
            "    mobile_number        VARCHAR(14)              NOT NULL,\n" +
            "    valid_from    TIMESTAMP NOT NULL,\n" +
            "    valid_to      TIMESTAMP NOT NULL \n" +
            ") WITH (\n" +
            "    'connector' = 'kafka',\n" +
            "    'topic'     = 'mobile_accounts',\n" +
            "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
            "    'format'    = 'json'\n" +

    tableEnv.executeSql("CREATE TABLE mobile_account\n" +
            "(\n" +
            "    mobile_acc_id VARCHAR(36)              NOT NULL,\n" +
            "    member_id     BIGINT                   NOT NULL,\n" +
            "    mobile_number        VARCHAR(14)              NOT NULL,\n" +
            "    valid_from    TIMESTAMP NOT NULL,\n" +
            "    valid_to      TIMESTAMP NOT NULL \n" +
            ") WITH (\n" +
            "   'connector'  = 'jdbc',\n" +
            "   'url'        = 'jdbc:postgresql://flinkpg:5432/flink-demo',\n" +
            "   'table-name' = 'mobile_account',\n" +
            "   'driver'     = 'org.postgresql.Driver',\n" +
            "   'username'   = 'flink-demo',\n" +
            "   'password'   = 'flink-demo'\n" +

    Table mobileAccounts = tableEnv.from("mobile_accounts");


public static Table report(Table mobileAccounts) {

What I have noticed on the flink console is that it is only consuming from one Source!

I liked TableEnvironment as not much code is needed to get the items inserted into the DB.

How can we consume from both the sources, Kinesis and TableEnvironment in Flink?

Am I using the right approach?

Is there an alternative to implement my requirements?

Upvotes: 0

Views: 310

Answers (1)

Metehan Yıldırım
Metehan Yıldırım

Reputation: 401

I assume you are able to create the tables correct, then you can simply JOIN two streams named kafka_stream and kinesis_stream as

SELECT l.*, r.something_useful FROM kinesis_stream as l
INNER JOIN kafka_stream as r
ON l.member_id = r.member_id;

If PostgreSQL sink is essential, you can make it in a different query as

INSERT INTO postgre_sink
SELECT * FROM kafka_stream;

They will solve your problem with Table API (or Flink SQL).

Upvotes: 1

Related Questions