Reputation: 2164
I am new to Apache Beam and am trying to run the following code:
import math
import os
import pprint
import time
import pathlib
import tempfile
import tensorflow as tf
import apache_beam as beam
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam
from tfx_bsl.public import tfxio
from tfx_bsl.coders.example_coder import RecordBatchToExamples
from apache_beam.io.gcp.gcsio import GcsIO
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from sklearn.preprocessing import scale
train = 'gs://path/to/adult.data'
test = 'gs://path/to/adult.test'
CATEGORICAL_FEATURE_KEYS = [
'workclass',
'education',
'marital-status',
'occupation',
'relationship',
'race',
'sex',
'native-country',
]
NUMERIC_FEATURE_KEYS = [
'age',
'capital-gain',
'capital-loss',
'hours-per-week',
]
OPTIONAL_NUMERIC_FEATURE_KEYS = [
'education-num',
]
ORDERED_CSV_COLUMNS = [
'age', 'workclass', 'fnlwgt', 'education', 'education-num',
'marital-status', 'occupation', 'relationship', 'race', 'sex',
'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 'label'
]
LABEL_KEY = 'label'
RAW_DATA_FEATURE_SPEC = dict(
[(name, tf.io.FixedLenFeature([], tf.string))
for name in CATEGORICAL_FEATURE_KEYS] +
[(name, tf.io.FixedLenFeature([], tf.float32))
for name in NUMERIC_FEATURE_KEYS] +
[(name, tf.io.VarLenFeature(tf.float32))
for name in OPTIONAL_NUMERIC_FEATURE_KEYS] +
[(LABEL_KEY, tf.io.FixedLenFeature([], tf.string))]
)
SCHEMA = tft.tf_metadata.dataset_metadata.DatasetMetadata(
tft.tf_metadata.schema_utils.schema_from_feature_spec(RAW_DATA_FEATURE_SPEC)).schema
testing = False
NUM_OOV_BUCKETS = 1
if testing:
TRAIN_NUM_EPOCHS = 1
NUM_TRAIN_INSTANCES = 1
TRAIN_BATCH_SIZE = 1
NUM_TEST_INSTANCES = 1
else:
TRAIN_NUM_EPOCHS = 16
NUM_TRAIN_INSTANCES = 32561
TRAIN_BATCH_SIZE = 128
NUM_TEST_INSTANCES = 16281
# Names of temp files
TRANSFORMED_TRAIN_DATA_FILEBASE = 'train_transformed'
TRANSFORMED_TEST_DATA_FILEBASE = 'test_transformed'
EXPORTED_MODEL_DIR = 'exported_model_dir'
def preprocessing_fn(inputs):
"""Preprocess input columns into transformed columns."""
outputs = inputs.copy()
def sk_scale_to_z_score(X):
scaled = scale(X)
return scaled
def tf_scale_to_z_score(X):
X_shape = X.shape
[X_scaled, ] = tf.py_function(sk_scale_to_z_score, [X], [tf.float32])
X_scaled.set_shape(X_shape)
return X_scaled
# Scale numeric columns to have range [0, 1].
for key in NUMERIC_FEATURE_KEYS:
outputs[key] = tf_scale_to_z_score(inputs[key]) # tft.scale_to_0_1(inputs[key])
for key in OPTIONAL_NUMERIC_FEATURE_KEYS:
sparse = tf.sparse.SparseTensor(inputs[key].indices, inputs[key].values,
[inputs[key].dense_shape[0], 1])
dense = tf.sparse.to_dense(sp_input=sparse, default_value=0.)
# Reshaping from a batch of vectors of size 1 to a batch to scalars.
dense = tf.squeeze(dense, axis=1)
outputs[key] = tft.scale_to_0_1(dense)
for key in CATEGORICAL_FEATURE_KEYS:
outputs[key] = tft.compute_and_apply_vocabulary(
tf.strings.strip(inputs[key]),
num_oov_buckets=NUM_OOV_BUCKETS,
vocab_filename=key)
# For the label column we provide the mapping from string to index.
table_keys = ['>50K', '<=50K']
initializer = tf.lookup.KeyValueTensorInitializer(
keys=table_keys,
values=tf.cast(tf.range(len(table_keys)), tf.int64),
key_dtype=tf.string,
value_dtype=tf.int64)
table = tf.lookup.StaticHashTable(initializer, default_value=-1)
# Remove trailing periods for test data when the data is read with tf.data.
label_str = tf.strings.regex_replace(inputs[LABEL_KEY], r'\.', '')
label_str = tf.strings.strip(label_str)
data_labels = table.lookup(label_str)
transformed_label = tf.one_hot(
indices=data_labels, depth=len(table_keys), on_value=1.0, off_value=0.0)
outputs[LABEL_KEY] = tf.reshape(transformed_label, [-1, len(table_keys)])
return outputs
def transform_data(train_data_file, test_data_file, working_dir, options):
"""Transform the data and write out as a TFRecord of Example protos."""
with beam.Pipeline(options=options) as pipeline:
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
csv_tfxio = tfxio.BeamRecordCsvTFXIO(
physical_format='text',
column_names=ORDERED_CSV_COLUMNS,
schema=SCHEMA)
raw_data = (
pipeline
| 'ReadTrainData' >> beam.io.ReadFromText(
train_data_file, coder=beam.coders.BytesCoder())
| 'FixCommasTrainData' >> beam.Map(
lambda line: line.replace(b', ', b','))
| 'DecodeTrainData' >> csv_tfxio.BeamSource())
raw_dataset = (raw_data, csv_tfxio.TensorAdapterConfig())
# The TFXIO output format is chosen for improved performance.
transformed_dataset, transform_fn = (
raw_dataset | tft_beam.AnalyzeAndTransformDataset(
preprocessing_fn, output_record_batches=True))
# Transformed metadata is not necessary for encoding.
transformed_data, _ = transformed_dataset
# Extract transformed RecordBatches, encode and write them to the given
# directory.
_ = (
transformed_data
| 'EncodeTrainData' >>
beam.FlatMapTuple(lambda batch, _: RecordBatchToExamples(batch))
| 'WriteTrainData' >> beam.io.WriteToTFRecord(
os.path.join(working_dir, TRANSFORMED_TRAIN_DATA_FILEBASE)))
# Will write a SavedModel and metadata to working_dir, which can then
# be read by the tft.TFTransformOutput class.
_ = (
transform_fn
| 'WriteTransformFn' >> tft_beam.WriteTransformFn(working_dir))
def run(run_local):
now = time.strftime("%Y%m%d-%H%M%S")
temp = os.path.join('gs://test_bucket/test_dataflow', 'transformed_data', f'data-{now}')
if run_local:
# Execute pipeline in your local machine.
runner_options = {
"runner": "DirectRunner",
}
else:
runner_options = {
"runner": "DataflowRunner",
"temp_location": os.path.join(temp, "temp_location"),
"staging_location": os.path.join(temp, "staging_location"),
"max_num_workers": 6,
"num_workers": 1
}
options = PipelineOptions(
project='prject_id',
job_name='dataflow-job-test' + f"-{now}",
region='us-west1',
**runner_options
)
options.view_as(SetupOptions).save_main_session = True
options.view_as(SetupOptions).setup_file = os.path.join(
pathlib.Path(__file__).parent.absolute(), "setup.py")
transform_data(train, test, temp, options)
if __name__ == "__main__":
run_local = True
run(run_local)
This is a part of the following Tensorflow Documentation as mentioned here: Preprocessing data with TensorFlow Transform
However when trying to use the DirectRunner
I keep running into the following error:
TypeError: can't pickle PyCapsule objects
And I can't make head nor tail out of it. I checked out this link as well, however the author of the library himself says he can't help. I have tried to run the notebook given in the Tensorflow documentation and the notebook works.
Any help towards this resolving this will be appreciated.
Upvotes: 0
Views: 439
Reputation: 5104
I'm guessing you accidentally have a PyCapsule object in your main module due to one of your imports. Try running without save_main_session
. (If you need some of the globals, move your pipeline code to a "real" module and only import/invoke run
from the main module.)
Upvotes: 1