Jaango Jayaraj
Jaango Jayaraj

Reputation: 135

apache beam compare two datasets with only one column

My task is to compare two datasets in apache beam within dataflow runner and output three stages, one common in dataset1, another only in dataset1, then only in dataset 2.

I tried using CoGroupByKey however i am not sure it can be uased as we have only a one dimensional list. How can we compare these. What I tried is merge tow pcollections as shown below

import apache_beam as beam


with beam.Pipeline() as p:

  dataset1 = ['data1a', 'data1b', 'data1c', 'data1d']
  dataset2 = ['data2a', 'data2b', 'data1b', 'data2d']
  dataset1_pcoll = p | 'Read Dataset 1' >> beam.Create(dataset1)
  dataset2_pcoll = p | 'Read Dataset 2' >> beam.Create(dataset2)
  

  combined_data = (
            {
                'file1': dataset1_pcoll,
                'file2': dataset2_pcoll,
            }
            | 'CoGroup Files' >> beam.CoGroupByKey()          
        )      
   
   

getting the error as below

 wrapper = lambda x: [fn(*x)]
TypeError: <lambda>() takes 2 positional arguments but 6 were given [while running 'CoGroup Files/CoGroupByKeyImpl/Tag[file2]']

   

Upvotes: 0

Views: 33

Answers (1)

Jaango Jayaraj
Jaango Jayaraj

Reputation: 135

I found the below works . Not sure if it there are other better ways. I had to duplicate the element to map as key and I was able to get the desired output. Now i can tag the combined output and output different pcollections.

import apache_beam as beam


def make_kv_pair(x):
  """ double the record. is this a good practice"""
  return (x, x)
with beam.Pipeline() as p:

  dataset1 = ['data1a', 'data1b', 'data1c', 'data1d']
  dataset2 = ['data2a', 'data2b', 'data1b', 'data2d']
  dataset1_pcoll = (p | 'Read Dataset 1' >> beam.Create(dataset1) |  'doubledata1' >> beam.Map(make_kv_pair))
  dataset2_pcoll = (p | 'Read Dataset 2' >> beam.Create(dataset2)  |  'doubledata2' >> beam.Map(make_kv_pair))
  

  combined_data = (
            {
                'dataset1': dataset1_pcoll,
                'dataset2': dataset2_pcoll,
            }
            | 'Group datasets' >> beam.CoGroupByKey() 
            | 'printing combined' >> beam.Map(print)                     
        )      
   

Upvotes: 0

Related Questions