Paul
Paul

Reputation: 906

PyFlink 14.2 - Table API DDL - Semantic Exactly Once

I've had the scenario where I define a kafka source, UDF | UDTF for processing and sink to a Kafka sink. Doesn't matter what I do, if I run the job, the output is flood with the processed output of a single input record. For illustrative purposes, this is what's output on the defined kafka sink topic:

distinct timestamps, showing that the UDF is entered for each respective input record, but the same input record was processed:

output

By trying to figure out the problem I've read through whatever flink documentation I could find (and rabbit hole of links) in terms of enforcing 'semantic EXACTLY ONCE' processing of records. As far as I can gather it comes down to these following settings:

This guy presented the best visual representation for me to understand semantic_once_video

Also referencing stackoverflow questions I could find on the topic (mainly discussing in terms of Java implementations)... needless to say, still not resolved. Here's my code for reference:

import os
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
from pyflink.table import TableEnvironment, EnvironmentSettings, DataTypes, StreamTableEnvironment
from pyflink.table.udf import ScalarFunction, TableFunction, udf, udtf
from pyflink.datastream.checkpointing_mode import CheckpointingMode

KAFKA_SERVERS = os.getenv('KAFKA_BS_SERVERS',"localhost:9094").split(',')
KAFKA_USERNAME = "xxx"
KAFKA_PASSWORD = "_pass_"
KAFKA_SOURCE_TOPIC = 'source_topic'
KAFKA_SINK_TOPIC = 'sink_topic'
KAFKA_GROUP_ID = 'testgroup12'

JAR_DEPENDENCIES = os.getenv('JAR_DEPENDENCIES', '/opt/flink/lib_py')

class tbl_function(TableFunction):

  def open(self, function_context):
      pass

  def eval(self, *args):
    import json
    from datetime import datetime
    res = {
      'time': str(datetime.utcnow()),
      'input': json.loads(args[0])
    }

    yield json.dumps(res)

def pipeline():

  env = StreamExecutionEnvironment.get_execution_environment()
  env.set_parallelism(1)
  for file in os.listdir(JAR_DEPENDENCIES):
    if file.find('.jar') != -1:
      env.add_jars(f"file://{JAR_DEPENDENCIES}/{file}")
      print(f"added jar dep: {JAR_DEPENDENCIES}/{file}")

  env.enable_checkpointing(60000, CheckpointingMode.EXACTLY_ONCE)
  env.get_checkpoint_config().set_min_pause_between_checkpoints(120000)
  env.get_checkpoint_config().enable_unaligned_checkpoints()
  env.get_checkpoint_config().set_checkpoint_interval(30000)

  settings = EnvironmentSettings.new_instance()\
                      .in_streaming_mode()\
                      .use_blink_planner()\
                      .build()

  t_env = StreamTableEnvironment.create(stream_execution_environment= env, environment_settings=settings)

  source_ddl = f"""
            CREATE TABLE source_table(
                entry STRING
            ) WITH (
              'connector' = 'kafka',
              'topic' = '{KAFKA_SOURCE_TOPIC}',
              'properties.bootstrap.servers' = '{','.join(KAFKA_SERVERS)}',
              'properties.isolation_level' = 'read_committed',
              'properties.group.id' = '{KAFKA_GROUP_ID}',
              'properties.sasl.mechanism' = 'PLAIN',
              'properties.security.protocol' = 'SASL_PLAINTEXT',
              'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{KAFKA_USERNAME}\" password=\"{KAFKA_PASSWORD}\";',
              'scan.startup.mode' = 'earliest-offset',
              'format' = 'raw'
            )
            """

  sink_ddl = f"""
            CREATE TABLE sink_table(
                entry STRING
            ) WITH (
              'connector' = 'kafka',
              'topic' = '{KAFKA_SINK_TOPIC}',
              'properties.bootstrap.servers' = '{','.join(KAFKA_SERVERS)}',
              'properties.group.id' = '{KAFKA_GROUP_ID}',
              'properties.processing.mode' = 'exactly_once',
              'properties.enable.idempotence' = 'true',
              'properties.sasl.mechanism' = 'PLAIN',
              'properties.security.protocol' = 'SASL_PLAINTEXT',
              'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{KAFKA_USERNAME}\" password=\"{KAFKA_PASSWORD}\";',
              'format' = 'raw'
            )
            """

  t_env.execute_sql(source_ddl).wait()
  t_env.execute_sql(sink_ddl).wait()

  f = tbl_function()
  table_udf = udtf(f, result_types=[DataTypes.STRING()])
  t_env.create_temporary_function("table_f", table_udf)

  table = t_env.from_path('source_table')
  table = table.join_lateral('table_f(entry) as (content)')
  table = table.select('content').alias('entry')
  table.insert_into('sink_table')

  from datetime import datetime
  t_env.execute(f"dummy_test_{str(datetime.now())}")


if __name__ == '__main__':
    pipeline()

jar dependencies:

added jar dep: /opt/flink/lib_py/flink-sql-connector-kafka_2.12-1.14.2.jar

added jar dep: /opt/flink/lib_py/flink-connector-kafka_2.12-1.14.2.jar

added jar dep: /opt/flink/lib_py/kafka-clients-2.4.1.jar

Upvotes: 1

Views: 445

Answers (1)

Paul
Paul

Reputation: 906

After a whole bunch of trial-error, and still not precisely knowing why this resolved the issue (or underlying pyflink issue?), I found that if you utilize a table meta-data field in your source definition, that would somehow initialize or synchronize your pipeline to produce a semantic.EXACTLY_ONCE data flow (1 record in = 1 record out, no duplicates).

The only change that I had to made is 1 line of meta data code in the DDL source definition. (Again providing my full script for reference):

import os
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
from pyflink.table import TableEnvironment, EnvironmentSettings, DataTypes, StreamTableEnvironment
from pyflink.table.udf import ScalarFunction, TableFunction, udf, udtf
from pyflink.datastream.checkpointing_mode import CheckpointingMode

KAFKA_SERVERS = os.getenv('KAFKA_BS_SERVERS',"localhost:9094").split(',')
KAFKA_USERNAME = "xxx"
KAFKA_PASSWORD = "_pass_"
KAFKA_SOURCE_TOPIC = 'source_topic'
KAFKA_SINK_TOPIC = 'sink_topic'
KAFKA_GROUP_ID = 'testgroup12'

JAR_DEPENDENCIES = os.getenv('JAR_DEPENDENCIES', '/opt/flink/lib_py')

class tbl_function(TableFunction):

  def open(self, function_context):
      pass

  def eval(self, *args):
    import json
    from datetime import datetime
    res = {
      'time': str(datetime.utcnow()),
      'input': json.loads(args[0])
    }

    yield json.dumps(res)

def pipeline():

  env = StreamExecutionEnvironment.get_execution_environment()
  env.set_parallelism(1)
  for file in os.listdir(JAR_DEPENDENCIES):
    if file.find('.jar') != -1:
      env.add_jars(f"file://{JAR_DEPENDENCIES}/{file}")
      print(f"added jar dep: {JAR_DEPENDENCIES}/{file}")

  env.enable_checkpointing(60000, CheckpointingMode.EXACTLY_ONCE)
  env.get_checkpoint_config().set_min_pause_between_checkpoints(120000)
  env.get_checkpoint_config().enable_unaligned_checkpoints()
  env.get_checkpoint_config().set_checkpoint_interval(30000)

  settings = EnvironmentSettings.new_instance()\
                      .in_streaming_mode()\
                      .use_blink_planner()\
                      .build()

  t_env = StreamTableEnvironment.create(stream_execution_environment= env, environment_settings=settings)

  # this sneaky bugger line -> with 'event_time'
  source_ddl = f"""
            CREATE TABLE source_table(
                entry STRING,
                event_time TIMESTAMP(3) METADATA FROM 'timestamp'
            ) WITH (
              'connector' = 'kafka',
              'topic' = '{KAFKA_SOURCE_TOPIC}',
              'properties.bootstrap.servers' = '{','.join(KAFKA_SERVERS)}',
              'properties.isolation_level' = 'read_committed',
              'properties.group.id' = '{KAFKA_GROUP_ID}',
              'properties.sasl.mechanism' = 'PLAIN',
              'properties.security.protocol' = 'SASL_PLAINTEXT',
              'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{KAFKA_USERNAME}\" password=\"{KAFKA_PASSWORD}\";',
              'scan.startup.mode' = 'earliest-offset',
              'format' = 'raw'
            )
            """

  sink_ddl = f"""
            CREATE TABLE sink_table(
                entry STRING
            ) WITH (
              'connector' = 'kafka',
              'topic' = '{KAFKA_SINK_TOPIC}',
              'properties.bootstrap.servers' = '{','.join(KAFKA_SERVERS)}',
              'properties.group.id' = '{KAFKA_GROUP_ID}',
              'properties.processing.mode' = 'exactly_once',
              'properties.enable.idempotence' = 'true',
              'properties.sasl.mechanism' = 'PLAIN',
              'properties.security.protocol' = 'SASL_PLAINTEXT',
              'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{KAFKA_USERNAME}\" password=\"{KAFKA_PASSWORD}\";',
              'format' = 'raw'
            )
            """

  t_env.execute_sql(source_ddl).wait()
  t_env.execute_sql(sink_ddl).wait()

  f = tbl_function()
  table_udf = udtf(f, result_types=[DataTypes.STRING()])
  t_env.create_temporary_function("table_f", table_udf)

  table = t_env.from_path('source_table')
  table = table.join_lateral('table_f(entry) as (content)')
  table = table.select('content').alias('entry')
  table.insert_into('sink_table')

  from datetime import datetime
  t_env.execute(f"dummy_test_{str(datetime.now())}")


if __name__ == '__main__':
    pipeline()

Hope this saves someone time, unlike the 3 days I spent in trial-error #Sigh :(

Upvotes: 0

Related Questions