Ajay Chinni
Ajay Chinni

Reputation: 850

Py4JJavaError in pyflink Table api

This code converts pandas to flink table do the transformation than again converting back to pandas. It perfectly works fine when I use filter filter than select but gives me an error when i add group_by and order_by.

import pandas as pd
import numpy as np

f_s_env = StreamExecutionEnvironment.get_execution_environment()
f_s_settings = EnvironmentSettings.new_instance().use_old_planner().in_streaming_mode().build()
table_env = StreamTableEnvironment.create(f_s_env, environment_settings=f_s_settings)

df = pd.read_csv("dataBase/New/candidate.csv")

col = ['candidate_id', 'candidate_source_id', 'candidate_first_name',
       'candidate_middle_name', 'candidate_last_name', 'candidate_email',
       'created_date', 'last_modified_date', 'last_modified_by']

table = table_env.from_pandas(df,col)
table.filter("candidate_id > 322445")\
    .filter("candidate_first_name === 'Libby'")\
    .group_by("candidate_id, candidate_source_id")\
    .select("candidate_id, candidate_source_id")\
    .order_by("candidate_id").to_pandas()

My error is

Py4JJavaError: An error occurred while calling o3164.orderBy.
: org.apache.flink.table.api.ValidationException: A limit operation on unbounded tables is currently not supported.
    at org.apache.flink.table.operations.utils.SortOperationFactory.failIfStreaming(SortOperationFactory.java:131)
    at org.apache.flink.table.operations.utils.SortOperationFactory.createSort(SortOperationFactory.java:63)
    at org.apache.flink.table.operations.utils.OperationTreeBuilder.sort(OperationTreeBuilder.java:409)
    at org.apache.flink.table.api.internal.TableImpl.orderBy(TableImpl.java:401)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:745)

Upvotes: 1

Views: 1033

Answers (1)

David Anderson
David Anderson

Reputation: 43707

If you look in the documentation, you will see that with the Table API, ORDER BY is only supported for batch queries. If you switch to SQL, then you can have streaming queries that sort on an ascending time attribute.

Sorting by anything else in an unbounded streaming query is simply impossible, since sorting requires full knowledge of the input.

Upvotes: 3

Related Questions