Reputation: 53
Im new to Flink. I got two DataStreams and i want to apply a keyed join in a tumbling Window. All the code works fine but the join never gives any result. I even apply assignTimestampsAndWatermarks
on the joined Datastreams and on fromSource
KafkaSource<ConsumerRecord> iotA = KafkaSource.<ConsumerRecord>builder()
.setBootstrapServers(IP)
.setTopics("iotA")
.setStartingOffsets(OffsetsInitializer.latest())
.setDeserializer(KafkaRecordDeserializationSchema.of(new KafkaDeserializationSchema<ConsumerRecord>() {
@Override
public boolean isEndOfStream(ConsumerRecord record) {
return false;
}
@Override
public ConsumerRecord deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
String key = new String(record.key(), StandardCharsets.UTF_8);
String value = new String(record.value(), StandardCharsets.UTF_8);
return new ConsumerRecord(
record.topic(),
record.partition(),
record.offset(),
record.timestamp(),
record.timestampType(),
record.checksum(),
record.serializedKeySize(),
record.serializedValueSize(),
key,
value
);
}
@Override
public TypeInformation<ConsumerRecord> getProducedType() {
TypeInformation<ConsumerRecord> typeInfo = TypeInformation.of(ConsumerRecord.class);
return typeInfo;
}
}))
.build();
KafkaSource<ConsumerRecord> iotB = //same as iotA
DataStream<ConsumerRecord> iotA_datastream = env.fromSource(iotA,
WatermarkStrategy.<ConsumerRecord>forMonotonousTimestamps()
.withTimestampAssigner((record, timestamp) -> record.timestamp()), "Kafka Source");
DataStream<ConsumerRecord> iotB_datastream = //same as iotA_datastream
DataStream<ConsumerRecord> mapped_iotA = iotA_datastream.map(new MapFunction<ConsumerRecord, ConsumerRecord>() {
@Override
public ConsumerRecord map(ConsumerRecord record) throws Exception {
String new_value = splitValue((String) record.value(), 0);
return new ConsumerRecord(record.topic(), record.partition(), record.offset(), record.timestamp(), record.timestampType(),
record.checksum(), record.serializedKeySize(), record.serializedValueSize(), record.key(), new_value);
}
}).assignTimestampsAndWatermarks(WatermarkStrategy.<ConsumerRecord>forMonotonousTimestamps()
.withTimestampAssigner((record, timestamp) -> record.timestamp()));
DataStream<ConsumerRecord> mapped_iotB = //same as mapped_iotA
DataStream<String> joined_stream= mapped_iotA.join(mapped_iotB)
.where(new KeySelector<ConsumerRecord, String>() {
@Override
public String getKey(ConsumerRecord record) throws Exception {
System.out.println((String) record.key()+record.value());
return (String) record.key();
}
})
.equalTo(new KeySelector<ConsumerRecord, String>() {
@Override
public String getKey(ConsumerRecord record) throws Exception {
System.out.println((String) record.key()+record.value());
return (String) record.key();
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction<ConsumerRecord, ConsumerRecord, String> (){
@Override
public String join(ConsumerRecord record1, ConsumerRecord record2) throws Exception {//doesnt show anything
System.out.println("value1" + record1.value() + "value2" + record2.value());
return "null";
}
});
env.execute();
I also tried with other Watermark Strategies like forBoundedOutOfOrderness
and wider windows time with the same result
Upvotes: 1
Views: 230
Reputation: 43499
Try attaching a sink to the job. E.g.,
DataStream<String> joined_stream= mapped_iotA.join(mapped_iotB)
...
joined_stream.print();
env.execute();
Upvotes: 1