Reputation: 47
I'm trying to run a left outer join on two tables and convert the results to a DataStream.
All the joins I've done before using flink have been inner joins, and I have always followed the join with a .toRetractStream[MyCaseClass](someQueryConfig)
. However, with the introduction of null values due to the left join, my understanding from the flink docs is that I can no longer use case classes because they don't support null values when converting a table to a DataStream.
So, I'm trying to accomplish this using a POJO. Here is my code:
class EnrichedTaskUpdateJoin(val enrichedTaskId: String, val enrichedTaskJobId: String, val enrichedTaskJobDate: String, val enrichedTaskJobMetadata: Json, val enrichedTaskStartedAt: String, val enrichedTaskTaskMetadata: Json, val taskUpdateMetadata: Json = Json.Null) {}
val qConfig = tableEnv.queryConfig
qConfig.withIdleStateRetentionTime(IDLE_STATE_RETENTION_TIME)
val updatedTasksUpsertTable = enrichedTasksUpsertTable
.leftOuterJoin(taskUpdatesUpsertTable, 'enrichedTaskId === 'taskUpdateId)
.select(
'enrichedTaskId,
'enrichedTaskJobId,
'enrichedTaskJobDate,
'enrichedTaskJobMetadata,
'enrichedTaskStartedAt,
'enrichedTaskTaskMetadata,
'taskUpdateMetadata
)
val updatedEnrichedTasksStream: KeyedStream[String, String] = updatedTasksUpsertTable
.toAppendStream[EnrichedTaskUpdateJoin](qConfig)
.map(toEnrichedTask(_))
.map(encodeTask(_))
.keyBy(x => parse(x).getOrElse(Json.Null).hcursor.get[String]("id").getOrElse(""))
This compiles just fine, but when I try to run it, I get org.apache.flink.table.api.TableException: Unsupported join type 'LEFT'. Currently only non-window inner joins with at least one equality predicate are supported
. However, according to these docs, it seems like I should be able to run a left join. It also seems worth noting that the error gets thrown from the .toAppendStream[EnrichedTaskUpdateJoin](qConfig)
.
I thought perhaps the non-window
portion of the error implied that my idle state retention time was a problem, so I took the query config out, but got the same error.
Hopefully this has enough context, but if I need to add anything else please let me know. Also, I'm running flink 1.5-SNAPSHOT and Circe for json parsing. I'm also quite new to scala, so it's very possible that this is just some dumb syntax error.
Upvotes: 2
Views: 805
Reputation: 2664
Non-windowed outer joins are not supported in Flink 1.5-SNAPSHOT. As you can see in the link that you have posted, there is no "Streaming" tag next to "Outer Joins". Time-windowed joins (which work on time attributes) were supported in 1.5.
Flink 1.6 will provide LEFT
, RIGHT
, and FULL
outer joins (see also FLINK-5878).
Btw. make sure that EnrichedTaskUpdateJoin
is really a POJO because POJOs need a default constructor and I think also var
instead of val
.
Upvotes: 1