David Crook
David Crook

Reputation: 2730

PySpark UDF - resulting DF fails to show "value error: "mycolumn" name is not in list"

The scenario is very similar to this post with some variations: Pyspark Unsupported literal type class java.util.ArrayList

I have data of this format:

data.show()
+---------------+--------------------+--------------------+
|       features|                meta|           telemetry|
+---------------+--------------------+--------------------+
|   [seattle, 3]|[seattle, 3, 5344...|[[47, 1, 27, 92, ...|
|     [miami, 1]|[miami, 1, 236881...|[[31, 84, 24, 67,...|
|     [miami, 3]|[miami, 3, 02f4ca...|[[84, 5, 4, 93, 2...|
|   [seattle, 3]|[seattle, 3, ec48...|[[43, 16, 94, 93,...|
|   [seattle, 1]|[seattle, 1, 7d19...|[[70, 22, 45, 74,...|
|[kitty hawk, 3]|[kitty hawk, 3, d...|[[46, 15, 56, 94,...|

You can download a generated .json sample from this link: https://aiaccqualitytelcapture.blob.core.windows.net/streamanalytics/2019/08/21/10/0_43cbc7b0c9e845a187ce182b46eb4a3a_1.json?st=2019-08-22T15%3A20%3A20Z&se=2026-08-23T15%3A20%3A00Z&sp=rl&sv=2018-03-28&sr=b&sig=tsYh4oTNZXWbLnEgYypNqIsXH3BXOG8XyAH5ODi8iQg%3D

In particular, you can see that the actual data in each of these is actually a dictionary: the "features" column which is the one of interest to us is of this form: {"factory_id":"seattle","line_id":"3"}

I'm attempting to encode the data in features to one_hot via classical functional means.

See below:

def one_hot(value, categories_list):
  num_cats = len(categories_list)
  one_hot = np.eye(num_cats)[categories_list.index(value)]
  return one_hot

def one_hot_features(row, feature_keys, u_features):
  """
  feature_keys must be sorted.
  """
  cur_key = feature_keys[0]
  vector = one_hot(row["features"][cur_key], u_features[cur_key])
  for i in range(1, len(feature_keys)):
    cur_key = feature_keys[i]
    n_vector = one_hot(row["features"][cur_key], u_features[cur_key])
    vector = np.concatenate((vector,  n_vector), axis=None)
  return vector

The feature_keys and u_features in this case contain the following data:

feature_keys = ['factory_id', 'line_id']

u_features = {'factory_id': ['kitty hawk', 'miami', 'nags head', 'seattle'], 'line_id': ['1', '2', '3']}

I have created a udf and am attempting to create a new dataframe with the new column added using this udf. Code below:

def calc_onehot_udf(feature_keys, u_features):
  return udf(lambda x: one_hot_features(x, feature_keys, u_features))

n_data = data.withColumn("hot_feature", calc_onehot_udf(feature_keys, 
u_features)( col("features") ))

n_data.show()

This results in the following sets of error:


Py4JJavaError: An error occurred while calling o148257.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 91.0 failed 4 times, most recent failure: Lost task 0.3 in stage 91.0 (TID 1404, 10.139.64.5, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/databricks/spark/python/pyspark/sql/types.py", line 1514, in getitem idx = self.fields.index(item) ValueError: 'features' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 480, in main process() File "/databricks/spark/python/pyspark/worker.py", line 472, in process serializer.dump_stream(out_iter, outfile) File "/databricks/spark/python/pyspark/serializers.py", line 456, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/databricks/spark/python/pyspark/serializers.py", line 149, in dump_stream for obj in iterator: File "/databricks/spark/python/pyspark/serializers.py", line 445, in _batched for item in iterator: File "", line 1, in File "/databricks/spark/python/pyspark/worker.py", line 87, in return lambda *a: f(*a) File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper return f(*args, **kwargs) File "", line 4, in File "", line 11, in one_hot_features File "/databricks/spark/python/pyspark/sql/types.py", line 1519, in getitem raise ValueError(item) ValueError: features


Any assistance is greatly appreciated. I am actively investigating this.

The ideal output would be a new dataframe with the column: "hot_features" which contains the 1 dimensional one hot encoded array from the features column.

Upvotes: 0

Views: 2141

Answers (1)

David Crook
David Crook

Reputation: 2730

Turns out that there were a few key problems:

  1. You should or must specify the return type in the UDF. In this case it is ArrayType(FloatType())
  2. instead of returning an nd array from one_hot_features, I called vectors.tolist()
  3. passing col("features") sends the actual values inside the features column row by row and not the actual row data; therefor calling row["features"] as originally done is incorrect as there is no accessor because I already have the value for that row. I therefor renamed the first parameter to be "features_val" instead of "row" to better reflect the expected input.

new code below for one_hot_features.

def one_hot_features(features_val, feature_keys, u_features):
  cur_key = feature_keys[0]
  vector = one_hot(features_val[cur_key], u_features[cur_key])
  for i in range(1, len(feature_keys)):
    cur_key = feature_keys[i]
    n_vector = one_hot(features_val[cur_key], u_features[cur_key])
    vector = np.concatenate((vector,  n_vector), axis=None)
  return vector.tolist()

According to various other documentation I've found it appears that numpy arrays don't play particularly well with spark dataframes as of this writing and therefor it is best to transform them into the more generic python types. This appears to have solved the problem faced here.

updated code for the udf definition below:

def calc_onehot_udf(feature_keys, u_features):
  return udf(lambda x: one_hot_features(x, feature_keys, u_features), 
ArrayType(FloatType()))

n_data = data.withColumn("hot_feature", calc_onehot_udf(feature_keys, 
u_features)(col("features")))
n_data.show()

Good Luck if you face this problem; hopefully documenting here helps.

Upvotes: 0

Related Questions