Reputation: 89
I'm beginning to try Apache Flink and I'm trying to aggregate some values ingested from a kafka topic. Here's the code I'm using:
public class EnvironmentMeasuresJob {
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
final TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.executeSql("CREATE TABLE EnvironmentMeasures (" +
"`timestamp` TIMESTAMP(3) METADATA FROM 'timestamp'," +
"`area` STRING," +
"`sensor` STRING," +
"`co` DECIMAL(5, 2)," +
"`pm1` DECIMAL(5, 2)," +
"`pm25` DECIMAL(5, 2)," +
"`pm10` DECIMAL(5, 2)," +
"`temperature` DECIMAL(5, 2)," +
"`pressure` DECIMAL(5, 2)," +
"`humidity` DECIMAL(5, 2)," +
"WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '5' SECOND" +
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'seneca.environmentmeasures.raw'," +
"'properties.bootstrap.servers' = 'localhost:9092'," +
"'properties.group.id' = 'env-measures-job'," +
"'scan.startup.mode' = 'earliest-offset'," +
"'format' = 'json'," +
"'json.fail-on-missing-field' = 'false'," +
"'json.ignore-parse-errors' = 'false'" +
")");
Table environmentMeasures = tEnv.from("EnvironmentMeasures");
Table aggregatedEnvironmentMeasures = environmentMeasures
.window(Slide.over(lit(20).seconds())
.every(lit(10).seconds())
.on($("timestamp"))
.as("w"))
.groupBy($("sensor"), $("w"))
.select(
$("w").end().as("timestamp"),
$("area"),
$("sensor"),
$("co").avg().as("averageCO"),
$("pm1").avg().as("averagePM1"),
$("pm25").avg().as("averagePM25"),
$("pm10").avg().as("averagePM10"),
$("temperature").avg().as("averageTemperature"),
$("pressure").avg().as("averagePressure"),
$("humidity").avg().as("averageHumidity")
);
}
}
But when I try to execute the code I get the following exception:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Cannot resolve field [area], input field list:[sensor, EXPR$1, EXPR$2, EXPR$3, EXPR$4, EXPR$5, EXPR$6, EXPR$7, EXPR$0].
If I remove "area" from the select everything works. Any idea on how this happens? Am I missing something? Thanks euks
Upvotes: 0
Views: 884
Reputation: 43454
I think you need to either group by the area or compute some aggregation over the areas included in the given sensor and window.
Upvotes: 2