Reputation: 13
I'm working with PyFlink and have encountered an issue where my final output contains duplicated records despite my SQL queries being tested and working correctly in SQL. My setup includes a Kafka consumer that receives CalledNumber, and I am trying to join this stream with a static dataset read from a CSV file. Here is my setup:
I tried first to execute as one big query
with cte as(
select
SetupTime
,CallingNumber
,CalledNumber
,UniqueRecordID
,Zone
,CAST( Code as DOUBLE) as Code
,Rate
,sd.FileName as FileName
from streaming_data sd
left join static_data st
ON sd.CalledNumber LIKE CONCAT(st.Code, '%')
)
,
LongestMatch as(
select
SetupTime
,CallingNumber
,CalledNumber
,max(Code) as Code
,UniqueRecordID
,FileName
from cte
group by
SetupTime
,CallingNumber
,CalledNumber
,UniqueRecordID
,FileName
)
select
CalledNumber
,Zone
,CAST(st.Code AS VARCHAR(30)) AS Code
,CAST(Rate AS VARCHAR(30)) AS Rate
,SetupTime
,UniqueRecordID
,FileName
from LongestMatch lm
left join static_data st on lm.Code= CAST (st.Code as DOUBLE)
i also tried splitting queries into one query and multiple tables, using table api
static_data = ts_env.from_path("static_data")
static_data_renamed = static_data.select(
expr.col("Zone").alias("static_zone"),
expr.col("Code"),
expr.col("Rate")
)
cte_query= """
select
SetupTime
,CallingNumber
,CalledNumber
,UniqueRecordID
,CAST( Code as DOUBLE) as Code
,sd.FileName as FileName
from streaming_data sd
left join static_data st
ON sd.CalledNumber LIKE CONCAT(st.Code, '%')
"""
cte = ts_env.sql_query(cte_query)
ts_env.create_temporary_view("cte", cte)
# Group by 'Zone' and calculate the maximum 'Code' for each group
max_code_by_zone = (
cte.group_by(
expr.col("CalledNumber")
,expr.col("SetupTime")
,expr.col("UniqueRecordID")
,expr.col("FileName")
)
.select(
expr.col("Code").max.alias("max_code")
,expr.col("CalledNumber")
,expr.col("SetupTime")
,expr.col("UniqueRecordID")
,expr.col("FileName")
)
)
#Join the tables on the specified condition
joined_table = (
static_data_renamed
.join(max_code_by_zone)
.where( expr.col("Code").cast(DataTypes.DOUBLE()) == expr.col("max_code"))
)
# # # Select specific columns from the joined table
final_result = (
joined_table
.select(
expr.col("CalledNumber"),
expr.col("static_zone").alias("Zone"),
expr.col("Code").cast(DataTypes.STRING()).alias("Code"),
expr.col("Rate").cast(DataTypes.STRING()).alias("Rate"),
expr.col("SetupTime"),
expr.col("UniqueRecordID"),
expr.col("FileName")
)
)
final_result.execute().print()
Here sample result
| +I | xxxxxxxxxxx | 236 | 0.31771425 | 2024-08-09T14:17:50.927606 | ef15016b-8a9e-430a-acf4-cb0
| -U | xxxxxxxxxxx | 236 | 0.31771425 | 2024-08-09T14:17:50.927606 | ef15016b-8a9e-430a-acf4-cb0
| +I | zzzzzzzzzz | 266 | 0.4332153 | 2024-08-09T14:17:50.927606 | b08f3a3f-4f68-4915-a7c8-7c2
| +U | yyyyyyyyyyyy | 23350 | 0.16603965 | 2024-08-09T14:17:50.927606 | 31acf9de-405d-4af9-a255-324
| +I | yyyyyyyyyyyy | 233 | 0.21922425 | 2024-08-09T14:17:50.927606 | 31acf9de-405d-4af9-a255-324
| -U | yyyyyyyyyyyy | 233 | 0.21922425 | 2024-08-09T14:17:50.927606 | 31acf9de-405d-4af9-a255-324
I also tried applying ROW_NUMBER() in order to exclude and take final result.
Is there similar approach when using KAFKA for emit final with windowing.
There also second approach but i failed at first step, when trying to define steupTime as Types.SQL_TIMESTAMP() and DataTypes.TIMESTAMP_LTZ() for schema, there is always issue either at first de-serialization FIRST ERROR
java.time.format.DateTimeParseException: Text '2024-08-09 14:27:20.968' could not be parsed at index 10
SECOND ERROR
Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot be cast to class java.time.Instant (java.sql.Timestamp is in module java.sql of loader 'platform';
Upvotes: 1
Views: 59
Reputation: 13
I used tumbling window which solved this issue, following documentation and it solved the issue, i don't know if there is alternative way.
Upvotes: 0