crbl
crbl

Reputation: 397

Apache Beam and SqlTransform: Converting data types from dict not working

In a Dataflow / Apache Beam pipeline, I am reading data from BigQuery and I use SqlTranform to generate some new features. I use Python SDK.

I run into a problem when I try to convert the data types using a dictionary in order to be processed by the SqlTransform in the next step. When I manually convert the type of each ingested feature, it is working fine but I am looking for a more elegant way to convert the data types so that it can be processed by SqlTransform. What it is strange, is that I verified that the output to the beam.Row conversion in both cases is identical but only after a manual conversion, the SqlTransform executes successfully. Irrespective of the method I use, after the "Convert to rows" step, I get the same output (as expected).

Row(DATE_ID='2022-07-31', SUBS_ID=....., SUBS_SEGMENT='ABC3', OOB_VOI_EUR_SUM_3M=0.0, OOB_SMS_EUR_SUM_3M=0.0, OOB_DATA_EUR_SUM_3M=0.0, PROFILE_HANDSET_5G_FLG=0, key=-590511471006032596)

Question is why only the manual data type conversion method works and if there is any solution which does not require a one-by-one definition of conversion? I do not know what I am missing.

Here is the code:

data_types = {
    'DATE_ID': str,
    'SUBS_ID': int,
    'SUBS_SEGMENT': str,
    'OOB_VOI_EUR_SUM_3M': float,
    'OOB_SMS_EUR_SUM_3M': float,
    'OOB_DATA_EUR_SUM_3M': float,
    'PROFILE_HANDSET_5G_FLG': int,
    'key': int
}

class ConvertToRow(beam.DoFn):
    
    def __init__(self, data_types):
        self.data_types = data_types
        
    def unpack_values(self, element):
        yield {k: self.data_types[k](v) for k, v in element.items()}
    
    def process(self, element):
        ### ---> this is NOT working
        # yield beam.Row(**{k: self.data_types[k](v) for k, v in element.items()})
        ### ---> this is OK
        yield beam.Row(DATE_ID=str(element['DATE_ID']), \
                       SUBS_ID=int(element['SUBS_ID']), \
                       SUBS_SEGMENT=str(element['SUBS_SEGMENT']), \
                       OOB_VOI_EUR_SUM_3M=float(element['OOB_VOI_EUR_SUM_3M']), \
                       OOB_SMS_EUR_SUM_3M=float(element['OOB_SMS_EUR_SUM_3M']), 
                       OOB_DATA_EUR_SUM_3M=float(element['OOB_DATA_EUR_SUM_3M']), \
                       PROFILE_HANDSET_5G_FLG=int(element['PROFILE_HANDSET_5G_FLG']),
                       key=int(element['key']))
        ### ---> This is NOT working
        # yield beam.Row(**self.unpack_values(element))

pipeline code

with beam.Pipeline(runner, options=pipeline_options) as pipeline:
    logging.info(f'pipeline_options: {pipeline_options}')
    logging.getLogger().setLevel(logging.INFO)
    
    # Preprocess train data
    step = 'train'
    # Read raw train data from BQ
    raw_train_dataset = read_from_bq(pipeline, step, data_size) 
    

    rows_train_dataset = raw_train_dataset[0] | 'Convert to Rows' >> beam.ParDo(ConvertToRow(data_types))
    rows_train_dataset | beam.Map(print)
    
    # Apply the SQL transform
    filtered_rows = rows_train_dataset | SqlTransform(windowing_query)

    # Print the results
    filtered_rows | 'Print results' >> beam.Map(print)

The error:

/beam_env/lib/python3.7/site-packages/apache_beam/runners/runner.py in apply_PTransform(self, transform, input, options)
    213   def apply_PTransform(self, transform, input, options):
    214     # The base case of apply is to call the transform's expand.
--> 215     return transform.expand(input)
    216 
    217   def run_transform(self, 

~/beam_env/lib/python3.7/site-packages/apache_beam/transforms/external.py in expand(self, pvalueish)
    603       response = service.Expand(request)
    604       if response.error:
--> 605         raise RuntimeError(response.error)
    606       self._expanded_components = response.components
    607       if any(env.dependencies

RuntimeError: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:

java.lang.IllegalArgumentException: Unknown Coder URN beam:coder:pickled_python:v1. Known URNs: [beam:coder:avro:generic:v1, beam:coder:bytes:v1, beam:coder:bool:v1, beam:coder:string_utf8:v1, beam:coder:kv:v1, beam:coder:varint:v1, beam:coder:interval_window:v1, beam:coder:iterable:v1, beam:coder:timer:v1, beam:coder:length_prefix:v1, beam:coder:global_window:v1, beam:coder:windowed_value:v1, beam:coder:param_windowed_value:v1, beam:coder:double:v1, beam:coder:row:v1, beam:coder:sharded_key:v1, beam:coder:custom_window:v1, beam:coder:nullable:v1]

Upvotes: 0

Views: 120

Answers (0)

Related Questions