tpuli
tpuli

Reputation: 435

Is there any way we can parse s string expression in Apache Flink Table API?

I am trying to perform aggregation using Flink Table API by accepting group by field and field aggregation expressions as string parameters from the user.

Input

Is there any way we can do it using flink Table API? I tried to do the following, but it didn't help. Does flink have anything equivalent to selectExpr function in spark? https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.DataFrame.selectExpr.html

employeeTable
      .groupBy($("department"))
      .select(
        $("department"),
        $("count(employeeId)").as("numberOfEmployees"),
        $("max(salary)").as("maxSalary")
      )

It is throwing the following exception

Exception in thread "main" org.apache.flink.table.api.ValidationException: Cannot resolve field [count(employeeId)], input field list:[department].

Upvotes: 0

Views: 510

Answers (2)

David Anderson
David Anderson

Reputation: 43499

No, I don't believe this will work. Flink's SQL planner wants to know what the query is doing at compile time.

What you can do is construct a SQL query and create a new job to run that query. The SQL gateway that's coming in Flink 1.16 (see FLIP-91) should make this easier.

Upvotes: 1

Niko
Niko

Reputation: 680

I think you have wrong syntax.

  .select(
    $("department"),
    $("count(employeeId)").as("numberOfEmployees"),
    $("max(salary)").as("maxSalary")
  )

count and max you should call like this :

  $("employeeId").count().as("numberOfEmployees"),
  $("salary").max().as("maxSalary")

You can check Built-in functions here

Upvotes: 0

Related Questions