Reputation: 315
I am using KeyedCoProcessFunction on connected stream, both streams are keyed by id
, also I'm using MapState and putting a value of type list if the key doesn't exist, and I'm checking for the existence of key as well in processElement2
, so ideally there is no chance of NPE, but still getting it.
val joinStream = lookDataStream.keyBy(row -> row.<Long>getFieldAs("id"))
.connect(clickDataStream.keyBy(row -> row.<Long>getFieldAs("lookupid")))
.process(new EnrichJoinFunction());
public static class EnrichJoinFunction
extends KeyedCoProcessFunction<Long, Row, Row, Row> {
final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};
private MapState<Long, Row> map = null;
private MapState<Long, List<Row>> clickstreamState = null;
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<Long, Row> MapStateDescriptor =
new MapStateDescriptor<Long, Row>(
"state",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<Row>() {}));
MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
map = getRuntimeContext().getMapState(MapStateDescriptor);
MapStateDescriptor<Long, List<Row>> clickstreamStateMapStateDescriptor =
new MapStateDescriptor<Long, List<Row>>(
"clickstreamState",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<List<Row>>() {}));
clickstreamState MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.minutes(5).build());
clickstreamState = getRuntimeContext().getMapState(clickstreamStateMapStateDescriptor);
}
@Override
public void processElement1(
Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
Long id = lookupRow.<Long>getFieldAs("id");
if (!map.contains(id)) {
map.put(id, lookupRow);
}
// join immediately any matching click events, waiting for counterpart
if (clickstreamState.contains(id)) {
for (Row curRow : clickstreamState.get(id)) {
// enrich join
Row joinRow = join(clickstreamState.get(id), lookupRow);
out.collect(joinRow);
}
clickstreamState.remove(id);
}
}
@Override
public void processElement2(
Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));
Long id = clickRow.<Long>getFieldAs("id");
if (map.contains(id)) {
// enrich join
Row joinRow = join(clickRow, map.get(id));
out.collect(joinRow);
} else {
if (clickstreamState.contains(id)) {
List<Row> rows = clickstreamState.get(id);
if (rows != null) {
rows.add(clickRow);
} else {
throw new NullPointerException("This exception should never throw NPE");
}
} else {
val clickList = new ArrayList<Row>();
clickList.add(clickRow);
clickstreamState.put(id, clickList);
}
}
}
public Row join(Row clickRow, Row lookupRow) throws ParseException {
Row joinedRow = new Row(RowKind.INSERT, 13);
// row setter join ouput
return joinedRow;
}
}
Upvotes: 0
Views: 187
Reputation: 9245
Fundamentally I think the implementation is flawed. When your processElement1
and processElement2
methods are called, the operator's state is already scoped to the key value. So there's no need for a MapState<id, Row>
or MapState<id, List<Row>>
state, you just want ValueState<Row>
and ListState<Row>
.
Upvotes: 1