puneet kinra
puneet kinra

Reputation: 1

Facing Race Condition In Flink connected Stream in apache flink

Facing a Race Condition while implementing process function in flink connected streams. I am having Cache Map that is being shared between two functions processElement1 & processElement2 that is being called parallelly by 2 different threads.

Streams1--->(sending offerdata)

Streams2--->(sending lms(loyality management system data))

connect=Streams1.connect(Streams2);

connect.process(new TriggerStream);

In TriggerStream Class I am storing the data using unique Id: MemberId as unique Key to Store & lookup data in cache. When the data is flowing in I am not getting consisted results

class LRUConcurrentCache<K,V>{
    private final Map<K,V> cache;
    private final int maxEntries;
    public LRUConcurrentCache(final int maxEntries) {
        this.cache = new LinkedHashMap<K,V>(maxEntries, 0.75F, true) {
            private static final long serialVersionUID = -1236481390177598762L;
            @Override
            protected boolean removeEldestEntry(Map.Entry<K,V> eldest){
                return size() > maxEntries;
            }
        };
    }
    //Why we cant lock on the key
    public void put(K key, V value) {
        synchronized(key) {
            cache.put(key, value);
        }
    }

    //get methode
    public V get(K key) {
        synchronized(key) {
            return cache.get(key);
        }
    }



public class TriggerStream extends CoProcessFunction<IOffer, LMSData, String> {


    private static final long serialVersionUID = 1L;
    LRUCache cache; 
    private String offerNode;
    String updatedValue, retrivedValue;
    Subscriber subscriber;

    TriggerStream(){
        this.cache== new LRUCache(10);
    }



@Override
    public void processElement1(IOffer offer) throws Exception {
        try {
            ObjectMapper mapper = new ObjectMapper();
            mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
            mapper.enableDefaultTyping();
            // TODO Auto-generated method stub
            IOffer latestOffer = offer;

            //Check the subscriber is there or not

            retrivedValue = cache.get(latestOffer.getMemberId().toString());
            if ((retrivedValue == null)) {
                //Subscriber is the class that is used and converted as Json String & then store into map
                Subscriber subscriber = new Subscriber();
                subscriber.setMemberId(latestOffer.getMemberId());
                ArrayList<IOffer> offerList = new ArrayList<IOffer>();
                offerList.add(latestOffer);
                subscriber.setOffers(offerList);
                updatedValue = mapper.writeValueAsString(subscriber);
                cache.set(subscriber.getMemberId().toString(), updatedValue);
            } else {
                Subscriber subscriber = mapper.readValue(retrivedValue, Subscriber.class);
                List<IOffer> offers = subscriber.getOffers();
                offers.add(latestOffer);
                updatedValue= mapper.writeValueAsString(subscriber);
                cache.set(subscriber.getMemberId().toString(), subscriberUpdatedValue);
            }
        } catch (Exception pb) {
            applicationlogger.error("Exception in Offer Loading:"+pb);
            applicationlogger.debug("*************************FINISHED OFFER LOADING*******************************");
        }
        applicationlogger.debug("*************************FINISHED OFFER LOADING*******************************");

    }

@Override
    public void processElement2(LMSData lms) throws Exception {
        try {
            ObjectMapper mapper = new ObjectMapper();
            mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
            mapper.enableDefaultTyping();
            // TODO Auto-generated method stub

            //Check the subscriber is there or not

            retrivedValue = cache.get(lms.getMemberId().toString());
            if(retrivedValue !=null){
                Subscriber subscriber = mapper.readValue(retrivedValue, Subscriber.class);
                //do some calculations 
                String updatedValue = mapper.writeValueAsString(subscriber);
                //Update value
                cache.set(subscriber.getMemberId().toString(), updatedValue);
            }

        } catch (Exception pb) {
            applicationlogger.error("Exception in Offer Loading:"+pb);
            applicationlogger.debug("*************************FINISHED OFFER LOADING*******************************");
        }
        applicationlogger.debug("*************************FINISHED OFFER LOADING*******************************");

    }

}   

Upvotes: 0

Views: 736

Answers (1)

Fabian Hueske
Fabian Hueske

Reputation: 18987

Flink does not give guarantees in which order a CoProcessFunction (or any other Co*Function) ingests the data. Maintaining some kind of deterministic order across distributed, parallel tasks would be too expensive.

Instead, you have to work around that in your function with state and possibly timers. The LRUCache in your function should be maintained as state (probably keyed state). Otherwise, it will be lost in case of a failure. You can add another state for the first stream and buffer records until the lookup value from the second stream has arrived.

Upvotes: 2

Related Questions