gaurav miglani
gaurav miglani

Reputation: 315

NPE in Flink KeyedCoProcessFunction

I am using KeyedCoProcessFunction on connected stream, both streams are keyed by id, also I'm using MapState and putting a value of type list if the key doesn't exist, and I'm checking for the existence of key as well in processElement2, so ideally there is no chance of NPE, but still getting it.

val joinStream = lookDataStream.keyBy(row -> row.<Long>getFieldAs("id"))
        .connect(clickDataStream.keyBy(row -> row.<Long>getFieldAs("lookupid")))
        .process(new EnrichJoinFunction());

public static class EnrichJoinFunction
  extends KeyedCoProcessFunction<Long, Row, Row, Row> {


final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};

private MapState<Long, Row> map = null;
private MapState<Long, List<Row>> clickstreamState = null;

@Override
public void open(Configuration parameters) throws Exception {
  MapStateDescriptor<Long, Row> MapStateDescriptor =
      new MapStateDescriptor<Long, Row>(
          "state",
          TypeInformation.of(Long.class),
          TypeInformation.of(new TypeHint<Row>() {}));
  MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
  map = getRuntimeContext().getMapState(MapStateDescriptor);

  MapStateDescriptor<Long, List<Row>> clickstreamStateMapStateDescriptor =
      new MapStateDescriptor<Long, List<Row>>(
          "clickstreamState",
          TypeInformation.of(Long.class),
          TypeInformation.of(new TypeHint<List<Row>>() {}));
  clickstreamState MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.minutes(5).build());
  clickstreamState = getRuntimeContext().getMapState(clickstreamStateMapStateDescriptor);
}

@Override
public void processElement1(
    Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
    throws Exception {
  log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
  Long id = lookupRow.<Long>getFieldAs("id");
  if (!map.contains(id)) {
    map.put(id, lookupRow);
  }

  // join immediately any matching click events, waiting for counterpart
  if (clickstreamState.contains(id)) {
    for (Row curRow : clickstreamState.get(id)) {
      // enrich join
      Row joinRow = join(clickstreamState.get(id), lookupRow);
      out.collect(joinRow);
    }
    clickstreamState.remove(id);
  } 
}

@Override
public void processElement2(
    Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
    throws Exception {
  log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));

  Long id = clickRow.<Long>getFieldAs("id");

  if (map.contains(id)) {
      // enrich join
      Row joinRow = join(clickRow, map.get(id));
      out.collect(joinRow);
  } else {
    if (clickstreamState.contains(id)) {
      List<Row> rows = clickstreamState.get(id);
      if (rows != null) {
        rows.add(clickRow);
      } else {
        throw new NullPointerException("This exception should never throw NPE");
      }
    } else {
      val clickList = new ArrayList<Row>();
      clickList.add(clickRow);
      clickstreamState.put(id, clickList);
    }
  }
}

public Row join(Row clickRow, Row lookupRow) throws ParseException {
  Row joinedRow = new Row(RowKind.INSERT, 13);
  // row setter join ouput
  return joinedRow;
}
}

Upvotes: 0

Views: 187

Answers (1)

kkrugler
kkrugler

Reputation: 9245

Fundamentally I think the implementation is flawed. When your processElement1 and processElement2 methods are called, the operator's state is already scoped to the key value. So there's no need for a MapState<id, Row> or MapState<id, List<Row>> state, you just want ValueState<Row> and ListState<Row>.

Upvotes: 1

Related Questions