Reputation: 435
I am trying to perform aggregation using Flink Table API by accepting group by field and field aggregation expressions as string parameters from the user.
Input
count(employeeId)
, max(salary)
Is there any way we can do it using flink Table API? I tried to do the following, but it didn't help. Does flink have anything equivalent to selectExpr
function in spark?
https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.DataFrame.selectExpr.html
employeeTable
.groupBy($("department"))
.select(
$("department"),
$("count(employeeId)").as("numberOfEmployees"),
$("max(salary)").as("maxSalary")
)
It is throwing the following exception
Exception in thread "main" org.apache.flink.table.api.ValidationException: Cannot resolve field [count(employeeId)], input field list:[department].
Upvotes: 0
Views: 510
Reputation: 43499
No, I don't believe this will work. Flink's SQL planner wants to know what the query is doing at compile time.
What you can do is construct a SQL query and create a new job to run that query. The SQL gateway that's coming in Flink 1.16 (see FLIP-91) should make this easier.
Upvotes: 1
Reputation: 680
I think you have wrong syntax.
.select(
$("department"),
$("count(employeeId)").as("numberOfEmployees"),
$("max(salary)").as("maxSalary")
)
count and max you should call like this :
$("employeeId").count().as("numberOfEmployees"),
$("salary").max().as("maxSalary")
You can check Built-in functions here
Upvotes: 0