Jash Shah
Jash Shah

Reputation: 2164

TypeError: can't pickle PyCapsule objects when using tf.transform and Apache Beam

I am new to Apache Beam and am trying to run the following code:

import math
import os
import pprint
import time
import pathlib
import tempfile

import tensorflow as tf
import apache_beam as beam

import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam

from tfx_bsl.public import tfxio
from tfx_bsl.coders.example_coder import RecordBatchToExamples

from apache_beam.io.gcp.gcsio import GcsIO
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions

from sklearn.preprocessing import scale

train = 'gs://path/to/adult.data'
test = 'gs://path/to/adult.test'

CATEGORICAL_FEATURE_KEYS = [
    'workclass',
    'education',
    'marital-status',
    'occupation',
    'relationship',
    'race',
    'sex',
    'native-country',
]
NUMERIC_FEATURE_KEYS = [
    'age',
    'capital-gain',
    'capital-loss',
    'hours-per-week',
]
OPTIONAL_NUMERIC_FEATURE_KEYS = [
    'education-num',
]
ORDERED_CSV_COLUMNS = [
    'age', 'workclass', 'fnlwgt', 'education', 'education-num',
    'marital-status', 'occupation', 'relationship', 'race', 'sex',
    'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 'label'
]
LABEL_KEY = 'label'

RAW_DATA_FEATURE_SPEC = dict(
    [(name, tf.io.FixedLenFeature([], tf.string))
     for name in CATEGORICAL_FEATURE_KEYS] +
    [(name, tf.io.FixedLenFeature([], tf.float32))
     for name in NUMERIC_FEATURE_KEYS] +
    [(name, tf.io.VarLenFeature(tf.float32))
     for name in OPTIONAL_NUMERIC_FEATURE_KEYS] +
    [(LABEL_KEY, tf.io.FixedLenFeature([], tf.string))]
)

SCHEMA = tft.tf_metadata.dataset_metadata.DatasetMetadata(
    tft.tf_metadata.schema_utils.schema_from_feature_spec(RAW_DATA_FEATURE_SPEC)).schema

testing = False
NUM_OOV_BUCKETS = 1
if testing:
  TRAIN_NUM_EPOCHS = 1
  NUM_TRAIN_INSTANCES = 1
  TRAIN_BATCH_SIZE = 1
  NUM_TEST_INSTANCES = 1
else:
  TRAIN_NUM_EPOCHS = 16
  NUM_TRAIN_INSTANCES = 32561
  TRAIN_BATCH_SIZE = 128
  NUM_TEST_INSTANCES = 16281

# Names of temp files
TRANSFORMED_TRAIN_DATA_FILEBASE = 'train_transformed'
TRANSFORMED_TEST_DATA_FILEBASE = 'test_transformed'
EXPORTED_MODEL_DIR = 'exported_model_dir'

def preprocessing_fn(inputs):
  """Preprocess input columns into transformed columns."""
  outputs = inputs.copy()
  
  def sk_scale_to_z_score(X):
    scaled = scale(X)
    return scaled

  def tf_scale_to_z_score(X):
    X_shape = X.shape
    [X_scaled, ] = tf.py_function(sk_scale_to_z_score, [X], [tf.float32])
    X_scaled.set_shape(X_shape)
    return X_scaled
    
  # Scale numeric columns to have range [0, 1].
  for key in NUMERIC_FEATURE_KEYS:
    outputs[key] = tf_scale_to_z_score(inputs[key]) # tft.scale_to_0_1(inputs[key])

  for key in OPTIONAL_NUMERIC_FEATURE_KEYS:
    sparse = tf.sparse.SparseTensor(inputs[key].indices, inputs[key].values,
                                    [inputs[key].dense_shape[0], 1])
    dense = tf.sparse.to_dense(sp_input=sparse, default_value=0.)
    # Reshaping from a batch of vectors of size 1 to a batch to scalars.
    dense = tf.squeeze(dense, axis=1)
    outputs[key] = tft.scale_to_0_1(dense)

  for key in CATEGORICAL_FEATURE_KEYS:
    outputs[key] = tft.compute_and_apply_vocabulary(
        tf.strings.strip(inputs[key]),
        num_oov_buckets=NUM_OOV_BUCKETS,
        vocab_filename=key)

  # For the label column we provide the mapping from string to index.
  table_keys = ['>50K', '<=50K']
  initializer = tf.lookup.KeyValueTensorInitializer(
      keys=table_keys,
      values=tf.cast(tf.range(len(table_keys)), tf.int64),
      key_dtype=tf.string,
      value_dtype=tf.int64)
  table = tf.lookup.StaticHashTable(initializer, default_value=-1)
  # Remove trailing periods for test data when the data is read with tf.data.
  label_str = tf.strings.regex_replace(inputs[LABEL_KEY], r'\.', '')
  label_str = tf.strings.strip(label_str)
  data_labels = table.lookup(label_str)
  transformed_label = tf.one_hot(
      indices=data_labels, depth=len(table_keys), on_value=1.0, off_value=0.0)
  outputs[LABEL_KEY] = tf.reshape(transformed_label, [-1, len(table_keys)])

  return outputs

def transform_data(train_data_file, test_data_file, working_dir, options):
  """Transform the data and write out as a TFRecord of Example protos."""

  with beam.Pipeline(options=options) as pipeline:
    with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
      csv_tfxio = tfxio.BeamRecordCsvTFXIO(
          physical_format='text',
          column_names=ORDERED_CSV_COLUMNS,
          schema=SCHEMA)

      raw_data = (
          pipeline
          | 'ReadTrainData' >> beam.io.ReadFromText(
              train_data_file, coder=beam.coders.BytesCoder())
          | 'FixCommasTrainData' >> beam.Map(
              lambda line: line.replace(b', ', b','))
          | 'DecodeTrainData' >> csv_tfxio.BeamSource())

      raw_dataset = (raw_data, csv_tfxio.TensorAdapterConfig())

      # The TFXIO output format is chosen for improved performance.
      transformed_dataset, transform_fn = (
          raw_dataset | tft_beam.AnalyzeAndTransformDataset(
              preprocessing_fn, output_record_batches=True))

      # Transformed metadata is not necessary for encoding.
      transformed_data, _ = transformed_dataset

      # Extract transformed RecordBatches, encode and write them to the given
      # directory.
      _ = (
          transformed_data
          | 'EncodeTrainData' >>
          beam.FlatMapTuple(lambda batch, _: RecordBatchToExamples(batch))
          | 'WriteTrainData' >> beam.io.WriteToTFRecord(
              os.path.join(working_dir, TRANSFORMED_TRAIN_DATA_FILEBASE)))

      # Will write a SavedModel and metadata to working_dir, which can then
      # be read by the tft.TFTransformOutput class.
      _ = (
          transform_fn
          | 'WriteTransformFn' >> tft_beam.WriteTransformFn(working_dir))

def run(run_local):
    
    now = time.strftime("%Y%m%d-%H%M%S")
    temp = os.path.join('gs://test_bucket/test_dataflow', 'transformed_data', f'data-{now}')
    
    if run_local:
        # Execute pipeline in your local machine.
        runner_options = {
            "runner": "DirectRunner",
        }
    else:
        runner_options = {
            "runner": "DataflowRunner",
            "temp_location": os.path.join(temp, "temp_location"),
            "staging_location": os.path.join(temp, "staging_location"),
            "max_num_workers": 6,
            "num_workers": 1
        }
    
    options = PipelineOptions(
        project='prject_id',
        job_name='dataflow-job-test' + f"-{now}",
        region='us-west1',
        **runner_options
    )
    
    options.view_as(SetupOptions).save_main_session = True
    options.view_as(SetupOptions).setup_file = os.path.join(
        pathlib.Path(__file__).parent.absolute(), "setup.py")
    
    
    transform_data(train, test, temp, options)
    

if __name__ == "__main__":
    run_local = True
    run(run_local)

This is a part of the following Tensorflow Documentation as mentioned here: Preprocessing data with TensorFlow Transform

However when trying to use the DirectRunner I keep running into the following error:

TypeError: can't pickle PyCapsule objects

And I can't make head nor tail out of it. I checked out this link as well, however the author of the library himself says he can't help. I have tried to run the notebook given in the Tensorflow documentation and the notebook works.

Any help towards this resolving this will be appreciated.

Upvotes: 0

Views: 439

Answers (1)

robertwb
robertwb

Reputation: 5104

I'm guessing you accidentally have a PyCapsule object in your main module due to one of your imports. Try running without save_main_session. (If you need some of the globals, move your pipeline code to a "real" module and only import/invoke run from the main module.)

Upvotes: 1

Related Questions