user8045747
user8045747

Reputation: 13

How to Avoid Duplicated Records in PyFlink When Joining Kafka Stream with Static Data?

I'm working with PyFlink and have encountered an issue where my final output contains duplicated records despite my SQL queries being tested and working correctly in SQL. My setup includes a Kafka consumer that receives CalledNumber, and I am trying to join this stream with a static dataset read from a CSV file. Here is my setup:

I tried first to execute as one big query


 with cte as(
        select 
        SetupTime
        ,CallingNumber
        ,CalledNumber
        ,UniqueRecordID
        ,Zone
        ,CAST( Code as DOUBLE) as Code
        ,Rate
        ,sd.FileName as FileName
        from streaming_data sd
        left join static_data st
        ON sd.CalledNumber LIKE CONCAT(st.Code, '%')
        )
        ,
        LongestMatch as(
        select 
        SetupTime
        ,CallingNumber
        ,CalledNumber
        ,max(Code) as Code
        ,UniqueRecordID
        ,FileName
        from cte
        group by
        SetupTime
        ,CallingNumber
        ,CalledNumber
        ,UniqueRecordID
        ,FileName
        )
        select 
        CalledNumber
        ,Zone
        ,CAST(st.Code AS VARCHAR(30)) AS Code
        ,CAST(Rate AS VARCHAR(30)) AS Rate
        ,SetupTime
        ,UniqueRecordID
        ,FileName
        from LongestMatch lm
        left join static_data st on lm.Code= CAST (st.Code as DOUBLE)

i also tried splitting queries into one query and multiple tables, using table api

static_data = ts_env.from_path("static_data")
    static_data_renamed = static_data.select(
        expr.col("Zone").alias("static_zone"),
        expr.col("Code"),
        expr.col("Rate")
     
    )


    cte_query= """
         select 
        SetupTime
        ,CallingNumber
        ,CalledNumber
        ,UniqueRecordID
        ,CAST( Code as DOUBLE) as Code
        ,sd.FileName as FileName        
        from streaming_data sd
        left join static_data st
        ON sd.CalledNumber LIKE CONCAT(st.Code, '%')
    """
    cte = ts_env.sql_query(cte_query)
    ts_env.create_temporary_view("cte", cte)
  
    # Group by 'Zone' and calculate the maximum 'Code' for each group
    max_code_by_zone = (
        cte.group_by(
            expr.col("CalledNumber")
            ,expr.col("SetupTime")
            ,expr.col("UniqueRecordID")
            ,expr.col("FileName")
            )
        .select(
            expr.col("Code").max.alias("max_code")
            ,expr.col("CalledNumber")
            ,expr.col("SetupTime")
            ,expr.col("UniqueRecordID")
            ,expr.col("FileName")
            )
    )
    
   
    #Join the tables on the specified condition
    joined_table = (
        static_data_renamed
        .join(max_code_by_zone)
        .where( expr.col("Code").cast(DataTypes.DOUBLE()) == expr.col("max_code"))
    )

    # # # Select specific columns from the joined table
    final_result = (
        joined_table
        .select(
            expr.col("CalledNumber"),
            expr.col("static_zone").alias("Zone"),
            expr.col("Code").cast(DataTypes.STRING()).alias("Code"),
            expr.col("Rate").cast(DataTypes.STRING()).alias("Rate"),
            expr.col("SetupTime"),
            expr.col("UniqueRecordID"),
            expr.col("FileName")
        )
    )

    final_result.execute().print()

Here sample result

| +I |                    xxxxxxxxxxx |               236 |     0.31771425 |     2024-08-09T14:17:50.927606 | ef15016b-8a9e-430a-acf4-cb0
| -U |                    xxxxxxxxxxx |               236 |     0.31771425 |     2024-08-09T14:17:50.927606 | ef15016b-8a9e-430a-acf4-cb0
| +I |                     zzzzzzzzzz |               266 |      0.4332153 |     2024-08-09T14:17:50.927606 | b08f3a3f-4f68-4915-a7c8-7c2
| +U |                   yyyyyyyyyyyy |             23350 |     0.16603965 |     2024-08-09T14:17:50.927606 | 31acf9de-405d-4af9-a255-324
| +I |                   yyyyyyyyyyyy |               233 |     0.21922425 |     2024-08-09T14:17:50.927606 | 31acf9de-405d-4af9-a255-324
| -U |                   yyyyyyyyyyyy |               233 |     0.21922425 |     2024-08-09T14:17:50.927606 | 31acf9de-405d-4af9-a255-324

I also tried applying ROW_NUMBER() in order to exclude and take final result.

Is there similar approach when using KAFKA for emit final with windowing.

There also second approach but i failed at first step, when trying to define steupTime as Types.SQL_TIMESTAMP() and DataTypes.TIMESTAMP_LTZ() for schema, there is always issue either at first de-serialization FIRST ERROR

 java.time.format.DateTimeParseException: Text '2024-08-09 14:27:20.968' could not be parsed at index 10

SECOND ERROR

Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot be cast to class java.time.Instant (java.sql.Timestamp is in module java.sql of loader 'platform'; 

Upvotes: 1

Views: 59

Answers (1)

user8045747
user8045747

Reputation: 13

I used tumbling window which solved this issue, following documentation and it solved the issue, i don't know if there is alternative way.

Upvotes: 0

Related Questions