Kamil Kamil
Kamil Kamil

Reputation: 15

setting variables in apache flink

I'm asking this question because I'm having trouble setting variables in apache flink. i would like to use a stream to fetch data with which i will initialize the variables i need for the second stream. The problem is that the streams execute in parallel, which results in a missing value when initializing the second stream. sample code:

KafkaSource<Object> mainSource1 = KafkaSource.<Object>builder()
      .setBootstrapServers(...)
      .setTopicPattern(Pattern.compile(...))
      .setGroupId(...)
      .setStartingOffsets(OffsetsInitializer.earliest())
      .setDeserializer(new ObjectDeserializer())
      .build();

DataStream<Market> mainStream1 = env.fromSource(mainSource, WatermarkStrategy.forMonotonousTimestamps(), "mainSource");


// fetching data from the stream and setting variables


Map<TopicPartition, Long> endOffset = new HashMap<>();
endOffset.put(new TopicPartition("topicName", 0), offsetFromMainStream1);



KafkaSource<Object> mainSource2 = KafkaSource.<Object>builder()
      .setBootstrapServers(...)
      .setTopicPattern(Pattern.compile(...))
      .setGroupId(...)
      .setStartingOffsets(OffsetsInitializer.earliest())
      .setBounded(OffsetsInitializer.offsets(endOffset))
      .setDeserializer(new ObjectDeserializer())
      .build();

DataStream<Market> mainStream2 = env.fromSource(mainSource, WatermarkStrategy.forMonotonousTimestamps(), "mainSource");

// further stream operations


I would like to call the first stream from which I will fetch the data and set it locally then I can use it in operations on the second stream

Upvotes: 0

Views: 256

Answers (1)

Shankar
Shankar

Reputation: 2825

You want to use one Stream's data to control another Stream's behavior. The best way is to use the Broadcast state pattern.

This involves creating a BroadcastStream from mainStream1, and then connecting mainStream2 to mainStream1. Now mainStream2 can access the data from mainStream1.

Here is a high level example based on your code. I am assuming that the key is String.

// Broadcast Stream
MapStateDescriptor<String, Market> stateDescriptor = new MapStateDescriptor<>(
            "RulesBroadcastState",
            BasicTypeInfo.STRING_TYPE_INFO,
            TypeInformation.of(new TypeHint<Market>() {}));
        
// broadcast the rules and create the broadcast state
BroadcastStream<Market> mainStream1BroadcastStream = mainStream1.keyBy(// key by Id).
                        .broadcast(stateDescriptor);

DataStream<Market> yourOutput = mainStream2
                 .connect(mainStream1BroadcastStream)
                 .process(            
                    new KeyedBroadcastProcessFunction<>() {
                         // You can access mainStream1 output and mainStream2 data here.
                     }
                 );

This concept is explained in detail here. The code is also a modified version shown here - https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/#the-broadcast-state-pattern

Upvotes: 1

Related Questions