py-r
py-r

Reputation: 451

Beam - Is schema-agnostic ingestion/aggregation possible?

I want to ingest a stream of objects (e.g. JSON) that have changing schemas, not known a priori, and apply a custom aggregation, known a priori.

Is it possible in Beam ?

Specifically, can it:

  1. Ingest a list of (nested) JSON objects with changing schemas (in a PCollection):
    msg1 = {"product":"apple","price":{"currency":"JPY","amount":50}}
    msg2 = {"product":"apple","price":{"amount":70},"unuseful_field_for_this":"foo"}

  2. Apply custom aggregation over (global and updating) time window (in a CombineFn):
    res = {"product":"apple","sales":120,"currency":"JPY"} <= Using JPY as default

The below code shows a first attempt to be schema-agnostic by using tuples (with the function CombinePerKey proposed here), but doesn't demonstrate a "richer" use case as the above one.

Note: the transformation is triggered by a dummy message sent separately to PubSub.

Code

# Libraries

import random
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Generator function

def create_random_record(line):
 
    n_fields = random.randint(1,5)
    msg = {"key{}".format(i): i for i in range(1,n_fields)}
   
    # One possible msg: {'key1': 1, 'key2': 2}
    # Another possible msg: {'key1': 1, 'key2': 2, 'key3': 3}

    return msg

class msg_to_tuple_list(beam.DoFn):

    def process(self, msg):
        return [(k,v) for k,v in msg.items()]
  
# Run function

def run(argv=None, save_main_session=True):

  pipeline_options = PipelineOptions(save_main_session=True, streaming=True)
  
  with beam.Pipeline(options=pipeline_options) as p:
      
    input_subscription=MY_INPUT_SUBSCRIPTION
    output_table=MY_OUTPUT_TABLE
    
    _ = (p
        | 'Trigger from Pub/Sub' >> beam.io.ReadFromPubSub(subscription=input_subscription).with_output_types(bytes)
        | 'Random generator' >> beam.Map(create_random_record)
        | 'Convert dict to tuple list' >> beam.ParDo(msg_to_tuple_list())
        | 'Window' >> beam.WindowInto(window.FixedWindows(5))
        | 'Aggregation' >> beam.CombinePerKey(sum) # The function I'd like to accept any json..      
        )

# Run
run()

Output

('key1', 9)
('key2', 14)
('key3', 15)
('key4', 8)

Upvotes: 0

Views: 408

Answers (1)

I&#241;igo
I&#241;igo

Reputation: 2670

I have been working on this for some time, bare in mind that you need to add some adjustments and test it on your side. Things you need to have in mind:

  • For simplicity, the logic to find the key price is not in my example, but I had it in mind for the design. You would need to add it.

  • Since we are going to be accumulating elements and a GlobalWindow, I am a bit afraid you may run into memory issues after some time. In theory, the combiner lifting should make DF to only store the accumulators and not the all the elements. I had a pipeline with a similar approach for 2 days with no issues, but in Java (should be the same).

  • Some extra logic would be needed to match your exact use case, this would work as an idea.

I'm using an advanced combiner to (1) force the combiner lifting and (2) so you can add your key parsing logic. I used a trigger of 5 minutes, so the total sum per key would be updated every 5 mins (you can change the trigger if needed).

I'm going to start my code assuming you have already parsed the elements from your stream as we discussed in the comments. The elements entering my code are in this format:

        {"product": "apple", "price": {"currency": "JPY", "amount": 50}},
        {"product": "orange", "price": {"amount": 50}},
        {"product": "apple", "price": {"amount": 10}},
        {"product": "orange", "price": {"currency": "EUR", "amount": 50}},
        {"product": "apple", "price": {"currency": "JPY", "amount": 30}}

Those get passed to the pipeline:

    class NestedDictSum(beam.CombineFn):
        def create_accumulator(self):
            # accumulator instance starts at 0
            return 0

        def add_input(self, sum_value, input):
            # Called for every new element, add it to the accumulator
            return sum_value + self._get_price(input)

        def merge_accumulators(self, accumulators):
            # Called for every accumulator across workers / bundles
            return sum(accumulators)

        def extract_output(self, total):
            # output value from merged accumulators
            return {"sales": total, "currency": "JPY"}

        def _get_price(self, dictionary):
            # Add your logic to find the right key in it
            # Needs to return the parsed price (what you want to sum)
            return dictionary["price"]["amount"]

    def add_product(element):
        dictionary = element[1]
        dictionary["product"] = element[0]
        return dictionary

# Pipeline read stream and so on

     | Map(lambda x: (x["product"], x))  # To KV
     | WindowInto(GlobalWindows(),
                  trigger=trigger.Repeatedly(trigger.AfterProcessingTime(5 * 60)),
                  # This makes the elements not be discarded, so the value would
                  # be updated as new elements are triggered
                  accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
     | CombinePerKey(NestedDictSum())
     | Map(add_product)  # From KV to Dictionary adding Key

The output of this is:

{'sales': 90, 'currency': 'JPY', 'product': 'apple'}
{'sales': 100, 'currency': 'JPY', 'product': 'orange'}

Note that in an actual Streaming case, this value would get updated every 5 mins

Also, I thinks this peculiar use case may benefit from using Stateful and Timely DoFN. This will allow you control your elements with a finer grain and may be a better fit than what I posted.

Upvotes: 1

Related Questions