gagan malhotra
gagan malhotra

Reputation: 168

Error while using tensorflow transform' writeTransform function

I'm currently using Tensorflow transform library to convert and save the transformation, though it used to work before just fine currently im facing a bit of issue something similar to below

I keep getting the same error like -

'BeamDatasetMetadata' object has no attribute 'schema' [while running 'AnalyzeAndTransformDataset/TransformDataset/ConvertAndUnbatch']

Is someone familiar with the error above and how can we resolve it?

My Transform Function looks like below -

# ### Transformation Function
def transform_data(train_data_file, test_data_file, working_dir):
  """Transform the data and write out as a TFRecord of Example protos.
  Read in the data using the CSV reader, and transform it using a
  preprocessing pipeline that scales numeric data and converts categorical data
  from strings to int64 values indices, by creating a vocabulary for each
  category.
  Args:
    train_data_file: File containing training data
    test_data_file: File containing test data
    working_dir: Directory to write transformed data and metadata to
  """

  def preprocessing_fn(inputs):

    """Preprocess input columns into transformed columns."""
    outputs = {}

    # Scale numeric columns to have range [0, 1].
    for key in NUMERIC_FEATURE_KEYS:
      outputs[key] = tft.scale_to_0_1(inputs[key])

    # For all categorical columns except the label column, we use
    # tft.string_to_int which computes the set of unique values and uses this
    # to convert the strings to indices.
    for key in CATEGORICAL_FEATURE_KEYS:
      tft.uniques(inputs[key], vocab_filename=key)

    """ We would use the lookup table when the label is a string value
        In our case here Creative_id = 0/1 so we can direclty assign output as is
    """
    outputs[LABEL_KEY] = inputs[LABEL_KEY]

    return outputs

  # The "with" block will create a pipeline, and run that pipeline at the exit
  # of the block.
  with beam.Pipeline() as pipeline:
    with beam_impl.Context(temp_dir=tempfile.mkdtemp()):
      # Create a coder to read the data with the schema.  To do this we
      # need to list all columns in order since the schema doesn't specify the
      # order of columns in the csv.

      ordered_columns = [
         'app_category', 'connection_type', 'creative_id', 'day_of_week',
       'device_size', 'geo', 'hour_of_day', 'num_of_connects',
       'num_of_conversions', 'opt_bid', 'os_version'
      ]
      converter = csv_coder.CsvCoder(ordered_columns, RAW_DATA_METADATA.schema)

      # Read in raw data and convert using CSV converter.  Note that we apply
      # some Beam transformations here, which will not be encoded in the TF
      # graph since we don't do the from within tf.Transform's methods
      # (AnalyzeDataset, TransformDataset etc.).  These transformations are just
      # to get data into a format that the CSV converter can read, in particular
      # removing empty lines and removing spaces after commas.
      raw_data = (
          pipeline
          | 'ReadTrainData' >> textio.ReadFromText(train_data_file)
          | 'FilterTrainData' >> beam.Filter(
              lambda line: line and line != 'app_category,connection_type,creative_id,day_of_week,device_size,geo,hour_of_day,num_of_connects,num_of_conversions,opt_bid,os_version')
          | 'FixCommasTrainData' >> beam.Map(
              lambda line: line.replace(', ', ','))
          | 'DecodeTrainData' >> MapAndFilterErrors(converter.decode))

      # Combine data and schema into a dataset tuple.  Note that we already used
      # the schema to read the CSV data, but we also need it to interpret
      # raw_data.

      raw_dataset = (raw_data, RAW_DATA_METADATA)
      transformed_dataset, transform_fn = (
          raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocessing_fn))

      transformed_data, transformed_metadata = transformed_dataset

      transformed_data_coder = example_proto_coder.ExampleProtoCoder(transformed_metadata.schema)

      _ = (
          transformed_data
          | 'EncodeTrainData' >> beam.Map(transformed_data_coder.encode)
          | 'WriteTrainData' >> tfrecordio.WriteToTFRecord(
              os.path.join(working_dir, TRANSFORMED_TRAIN_DATA_FILEBASE)))

      # Now apply transform function to test data.  In this case we also remove
      # the header line from the CSV file and the trailing period at the end of
      # each line.
      raw_test_data = (
         pipeline
          | 'ReadTestData' >> textio.ReadFromText(test_data_file, skip_header_lines=1)
          | 'FixCommasTestData' >> beam.Map(
              lambda line: line.replace(', ', ','))
          | 'DecodeTestData' >> beam.Map(converter.decode))

      raw_test_dataset = (raw_test_data, RAW_DATA_METADATA)

      transformed_test_dataset = ((raw_test_dataset, transform_fn) | beam_impl.TransformDataset())
      # Don't need transformed data schema, it's the same as before.
      transformed_test_data, _ = transformed_test_dataset

      _ = (
          transformed_test_data
          | 'EncodeTestData' >> beam.Map(transformed_data_coder.encode)
          | 'WriteTestData' >> tfrecordio.WriteToTFRecord(
             os.path.join(working_dir, TRANSFORMED_TEST_DATA_FILEBASE)))

      _ = (
          transform_fn
          | 'WriteTransformFn' >>
          transform_fn_io.WriteTransformFn(working_dir))

Ouput stack for -

pip show tensorflow-transform apache-beam

Name: tensorflow-transform
Version: 0.4.0
Summary: A library for data preprocessing with TensorFlow
Home-page: UNKNOWN
Author: Google Inc.
Author-email: tf-transform-feedback@google.com
License: Apache 2.0
Location: /usr/local/lib/python2.7/dist-packages
Requires: six, apache-beam, protobuf
---
Name: apache-beam
Version: 2.4.0
Summary: Apache Beam SDK for Python
Home-page: https://beam.apache.org
Author: Apache Software Foundation
Author-email: dev@beam.apache.org
License: Apache License, Version 2.0
Location: /usr/local/lib/python2.7/dist-packages
Requires: oauth2client, httplib2, mock, crcmod, grpcio, futures, pyvcf, avro, typing, pyyaml, dill, six, hdfs, protobuf
You are using pip version 9.0.1, however version 10.0.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.

the above issue doesn't seem to occur all the time though! looks like it has some conflict with other packages.

Upvotes: 3

Views: 1273

Answers (1)

Nikenano
Nikenano

Reputation: 33

Can't see but this line looks incomplete:

```
converter = csv_coder.CsvCoder(ordered_columns, RAW_DATA_METADATA.schema)
```

A possible way to do it:

```
INPUT_SCHEMA = dataset_schema.from_feature_spec({
'label':tf.FixedLenFeature(shape=[], dtype=tf.float32),
'id': tf.FixedLenFeature(shape=[], dtype=tf.float32),
'date': tf.FixedLenFeature(shape=[], dtype=tf.string),
'random': tf.FixedLenFeature(shape=[], dtype=tf.string),
'name': tf.FixedLenFeature(shape=[], dtype=tf.string),
'tweet': tf.FixedLenFeature(shape=[], dtype=tf.string),
})
```

```
converter_input = coders.CsvCoder(
['label','id','date','random','name','tweet'],
INPUT_SCHEMA,
delimiter=delimiter)
```

Then for the transform step where it seams like your actuall problem is here is an example as well.

```
input_metadata = 
dataset_metadata.DatasetMetadata(schema=TRANSFORM_INPUT_SCHEMA)
TRANSFORM_INPUT_SCHEMA = dataset_schema.from_feature_spec({
'id': tf.FixedLenFeature(shape=[], dtype=tf.float32),
'label': tf.FixedLenFeature(shape=[], dtype=tf.float32),
'tweet': tf.FixedLenFeature(shape=[], dtype=tf.string),
'answer_to_nbr': tf.FixedLenFeature(shape=[], dtype=tf.float32),
'nbr_of_tags': tf.FixedLenFeature(shape=[], dtype=tf.float32),
})
train_dataset = (train_dataset, input_metadata)
    transformed_dataset, transform_fn = (train_dataset
                                          | 'AnalyzeAndTransform' >> 
beam_impl.AnalyzeAndTransformDataset(
                 preprocessing_fn))
```

Hope it helps you :) If you post to your github repo I could look at the full code and see if I can help! Good luck!

Look at this repo for help https://github.com/Fematich/tftransform-demo

Upvotes: 1

Related Questions