epb
epb

Reputation: 484

Flink: how to store state and use in another stream?

I have a use-case for Flink where I need to read information from a file, store each line, and then use this state to filter another stream.

I have all of this working right now with the connect operator and a RichCoFlatMapFunction, but it feels overly complicated. Also, I'm concerned that flatMap2 could begin executing before all of the state is loaded from the file:

fileStream
    .connect(partRecordStream.keyBy((KeySelector<PartRecord, String>) partRecord -> partRecord.getPartId()))
    .keyBy((KeySelector<String, String>) partId -> partId, (KeySelector<PartRecord, String>) partRecord -> partRecord.getPartId())
    .flatMap(new RichCoFlatMapFunction<String, PartRecord, PartRecord>() {
        private transient ValueState<String> storedPartId;
        @Override
        public void flatMap1(String partId, Collector<PartRecord> out) throws Exception {
            // store state
            storedPartId.update(partId);
        }

        @Override
        public void flatMap2(PartRecord record, Collector<PartRecord> out) throws Exception {
            if (record.getPartId().equals(storedPartId.value())) {
                out.collect(record);
            } else {
                // do nothing
            }
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<String> descriptor =
                    new ValueStateDescriptor<>(
                            "partId", // the state name
                            TypeInformation.of(new TypeHint<String>() {}),
                            null);
            storedPartId = getRuntimeContext().getState(descriptor);
        }
    });

Is there a better way (as of Flink 1.1.3) to accomplish this pattern of loading state, then using it in subsequent streams?

Upvotes: 4

Views: 2556

Answers (1)

Fabian Hueske
Fabian Hueske

Reputation: 18987

Your concerns about the CoFlatMapFunction are correct. The order in which flatMap1 and flatMap2 are called cannot be controlled and depend on the order in which data arrives. So, flatMap2 might be called before all data has been read by flatMap1.

The only way in Flink 1.1.3 to read all data before starting to process a stream is to consume the data in the open() method of a RichFlatMapFunction, i.e., you have to manually read and parse the file.

This is basically a broadcast join strategy, i.e., each parallel instance of the operator will do this. The drawback is that the data of the file will be replicated. The benefit is that you do not have to shuffle the "main" stream (no need to use keyBy()).

Upvotes: 4

Related Questions