Reputation: 5059
I'm using a local Flink 1.6 cluster configured to use the flink-table
jar
(meaning my program's jar does not include flink-table
).
With the following code
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.List;
public class JMain {
public static void main(String[] args) throws Exception {
ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(execEnv);
tableEnv.registerFunction("enlist", new Enlister());
DataSource<Tuple2<String, String>> source = execEnv.fromElements(
new Tuple2<>("a", "1"),
new Tuple2<>("a", "2"),
new Tuple2<>("b", "3")
);
Table table = tableEnv.fromDataSet(source, "a, b")
.groupBy("a")
.select("enlist(a, b)");
tableEnv.toDataSet(table, Row.class)
.print();
}
public static class Enlister
extends AggregateFunction<List<String>, ArrayList<String>>
implements ResultTypeQueryable<List<String>>
{
@Override
public ArrayList<String> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<String> getValue(ArrayList<String> acc) {
return acc;
}
@SuppressWarnings("unused")
public void accumulate(ArrayList<String> acc, String a, String b) {
acc.add(a + ":" + b);
}
@SuppressWarnings("unused")
public void merge(ArrayList<String> acc, Iterable<ArrayList<String>> it) {
for (ArrayList<String> otherAcc : it) {
acc.addAll(otherAcc);
}
}
@SuppressWarnings("unused")
public void resetAccumulator(ArrayList<String> acc) {
acc.clear();
}
@Override
public TypeInformation<List<String>> getProducedType() {
return TypeInformation.of(new TypeHint<List<String>>(){});
}
}
}
I get this weird exception:
org.apache.flink.table.api.ValidationException: Expression Enlister(List('a, 'b)) failed on input check: Given parameters do not match any signature.
Actual: (java.lang.String, java.lang.String)
Expected: (java.lang.String, java.lang.String)
However, if I do not implement ResultTypeQueryable
,
I get the expected output:
Starting execution of program
[b:3]
[a:1, a:2]
Program execution finished
Job with JobID 20497bd3efe44fab0092a05a8eb7d9de has finished.
Job Runtime: 270 ms
Accumulator Results:
- 56e0e5a9466b84ae44431c9c4b7aad71 (java.util.ArrayList) [2 elements]
My actual use case seems to require ResultTypeQueryable
,
because otherwise I get this exception:
The return type of function ... could not be determined automatically,
due to type erasure. You can give type information hints by using the
returns(...) method on the result of the transformation call,
or by letting your function implement the 'ResultTypeQueryable' interface
Any way I can fix this?
Upvotes: 0
Views: 1105
Reputation: 5059
I tried to reproduce the problem in a small program but I couldn't,
it only happens in my larger project.
Unfortunately, overriding getResultType()
and getAccumulatorType()
didn't help either,
I got this exception in that case:
java.lang.IndexOutOfBoundsException
at org.apache.flink.api.java.typeutils.TupleTypeInfoBase.getTypeAt(TupleTypeInfoBase.java:199)
at org.apache.flink.api.java.typeutils.RowTypeInfo.getTypeAt(RowTypeInfo.java:179)
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.isSortKey(Keys.java:444)
at org.apache.flink.api.java.operators.SortPartitionOperator.ensureSortableKey(SortPartitionOperator.java:150)
at org.apache.flink.api.java.operators.SortPartitionOperator.<init>(SortPartitionOperator.java:75)
at org.apache.flink.api.java.DataSet.sortPartition(DataSet.java:1414)
I actually also got that exception even without overriding. The only thing that worked for me was essentially:
String[] fieldNames = new String[] {
"result"
};
TypeInformation<?>[] types = new TypeInformation[] {
TypeInformation.of(new TypeHint<List<String>>(){})
};
tableEnv.toDataSet(table, Types.ROW(fieldNames, types))...
Upvotes: 0
Reputation: 2654
Implementing ResultTypeQueryable
is not correct in this case. The exception is misleading. Instead override getResultType()
and getAccumulatorType()
. The reason behind this is that generics usually cause problems (due to Java's type erasure) when generating the type information for serializers.
Upvotes: 2