Reputation: 397
In a Dataflow / Apache Beam pipeline, I am reading data from BigQuery and I use SqlTranform to generate some new features. I use Python SDK.
I run into a problem when I try to convert the data types using a dictionary in order to be processed by the SqlTransform in the next step. When I manually convert the type of each ingested feature, it is working fine but I am looking for a more elegant way to convert the data types so that it can be processed by SqlTransform. What it is strange, is that I verified that the output to the beam.Row conversion in both cases is identical but only after a manual conversion, the SqlTransform executes successfully. Irrespective of the method I use, after the "Convert to rows" step, I get the same output (as expected).
Row(DATE_ID='2022-07-31', SUBS_ID=....., SUBS_SEGMENT='ABC3', OOB_VOI_EUR_SUM_3M=0.0, OOB_SMS_EUR_SUM_3M=0.0, OOB_DATA_EUR_SUM_3M=0.0, PROFILE_HANDSET_5G_FLG=0, key=-590511471006032596)
Question is why only the manual data type conversion method works and if there is any solution which does not require a one-by-one definition of conversion? I do not know what I am missing.
Here is the code:
data_types = {
'DATE_ID': str,
'SUBS_ID': int,
'SUBS_SEGMENT': str,
'OOB_VOI_EUR_SUM_3M': float,
'OOB_SMS_EUR_SUM_3M': float,
'OOB_DATA_EUR_SUM_3M': float,
'PROFILE_HANDSET_5G_FLG': int,
'key': int
}
class ConvertToRow(beam.DoFn):
def __init__(self, data_types):
self.data_types = data_types
def unpack_values(self, element):
yield {k: self.data_types[k](v) for k, v in element.items()}
def process(self, element):
### ---> this is NOT working
# yield beam.Row(**{k: self.data_types[k](v) for k, v in element.items()})
### ---> this is OK
yield beam.Row(DATE_ID=str(element['DATE_ID']), \
SUBS_ID=int(element['SUBS_ID']), \
SUBS_SEGMENT=str(element['SUBS_SEGMENT']), \
OOB_VOI_EUR_SUM_3M=float(element['OOB_VOI_EUR_SUM_3M']), \
OOB_SMS_EUR_SUM_3M=float(element['OOB_SMS_EUR_SUM_3M']),
OOB_DATA_EUR_SUM_3M=float(element['OOB_DATA_EUR_SUM_3M']), \
PROFILE_HANDSET_5G_FLG=int(element['PROFILE_HANDSET_5G_FLG']),
key=int(element['key']))
### ---> This is NOT working
# yield beam.Row(**self.unpack_values(element))
pipeline code
with beam.Pipeline(runner, options=pipeline_options) as pipeline:
logging.info(f'pipeline_options: {pipeline_options}')
logging.getLogger().setLevel(logging.INFO)
# Preprocess train data
step = 'train'
# Read raw train data from BQ
raw_train_dataset = read_from_bq(pipeline, step, data_size)
rows_train_dataset = raw_train_dataset[0] | 'Convert to Rows' >> beam.ParDo(ConvertToRow(data_types))
rows_train_dataset | beam.Map(print)
# Apply the SQL transform
filtered_rows = rows_train_dataset | SqlTransform(windowing_query)
# Print the results
filtered_rows | 'Print results' >> beam.Map(print)
The error:
/beam_env/lib/python3.7/site-packages/apache_beam/runners/runner.py in apply_PTransform(self, transform, input, options)
213 def apply_PTransform(self, transform, input, options):
214 # The base case of apply is to call the transform's expand.
--> 215 return transform.expand(input)
216
217 def run_transform(self,
~/beam_env/lib/python3.7/site-packages/apache_beam/transforms/external.py in expand(self, pvalueish)
603 response = service.Expand(request)
604 if response.error:
--> 605 raise RuntimeError(response.error)
606 self._expanded_components = response.components
607 if any(env.dependencies
RuntimeError: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
java.lang.IllegalArgumentException: Unknown Coder URN beam:coder:pickled_python:v1. Known URNs: [beam:coder:avro:generic:v1, beam:coder:bytes:v1, beam:coder:bool:v1, beam:coder:string_utf8:v1, beam:coder:kv:v1, beam:coder:varint:v1, beam:coder:interval_window:v1, beam:coder:iterable:v1, beam:coder:timer:v1, beam:coder:length_prefix:v1, beam:coder:global_window:v1, beam:coder:windowed_value:v1, beam:coder:param_windowed_value:v1, beam:coder:double:v1, beam:coder:row:v1, beam:coder:sharded_key:v1, beam:coder:custom_window:v1, beam:coder:nullable:v1]
Upvotes: 0
Views: 120