shruti
shruti

Reputation: 13

Working with TEDA 4.2 architecture.How to call LookUpCache operator in a loop?

Working with TEDA 4.2 architecture.How to call LookUpCache operator in a loop?

I have input as a1;b1;c1|a2;b2;c2|a3;b3;c3

Now I want to use a1 , a2 and a3 as look up key and generate a tuple for each (as output). I have extracted a1,a2,a3 using tokenizer but how to feed it to LookupCache operator so that it uses all keys one by one and generate 3 tuples.

Upvotes: 1

Views: 78

Answers (1)

Mark.Heger
Mark.Heger

Reputation: 3

Using the TEDA application framework, you need to prepare the TypesCommon.ReaderOutStreamType schema, which is the output of the parsing and the input to the data enrichment. It is extended with the attributes that are specified in the xxx.streams.custom::TypesCustom.LookupType type.

In xxx.streams.custom.TypesCustom.spl

    static LookupType = tuple<
        // ------------------------------------------------
        // custom code begin
        // ------------------------------------------------
        // add your custom attributes here
        // key attribute for LookupCache, e.g. a1, a2, a3 from input
        rstring a
        // ------------------------------------------------
        // custom code end
        // ------------------------------------------------
    >;

In xxx.chainprocessor.transformer.custom.DataProcessor.spl you can prepare extract the key attribute for your lookup, fill the a attribute and submit n tuples (LookupStream) to the LookupCache operator from a single input tuple (InRec)

    (stream<I> LookupStream as O) as PrepareLookup = Custom(InRec as I) {
        logic
            onTuple I: {
                mutable list<rstring> list1 = tokenize(attrInput, "|", true);
                for (rstring val in list1) {
                    mutable list<rstring> list2 = tokenize(val, ";", true);
                    I.a = list2[0];
                    submit(I, O); // submit for a1, a2 ...
                }   
            }
            onPunct I: {
                if (currentPunct() == Sys.WindowMarker) {
                    submit(Sys.WindowMarker, O);
                }
            }
    }


    stream<I> EnrichedRecords = LookupCache(LookupStream as I) {
        param
            segmentName:    "yourSeg"; // The name of the physical memory segment
            containerName:  "yourContainer"; // The name of the store
            useMutex:   false; // Don't protect data with mutex
            mutexName:  "";
            key:        I.a;
            keyType:        rstring;           // SPL type of key
            found:      I.lookupFound;     // attribute that takes lookup success
            valueType:  TypesCustom.YourType_t; //type of lookup table value as also defined in LookupManager
            disableLookup:  $disableLookup;   // always pass through this parameter
            applControlDir: $applControlDir;   // always pass through this parameter
    }

Upvotes: 0

Related Questions