Reputation: 2730
The scenario is very similar to this post with some variations: Pyspark Unsupported literal type class java.util.ArrayList
I have data of this format:
data.show()
+---------------+--------------------+--------------------+
| features| meta| telemetry|
+---------------+--------------------+--------------------+
| [seattle, 3]|[seattle, 3, 5344...|[[47, 1, 27, 92, ...|
| [miami, 1]|[miami, 1, 236881...|[[31, 84, 24, 67,...|
| [miami, 3]|[miami, 3, 02f4ca...|[[84, 5, 4, 93, 2...|
| [seattle, 3]|[seattle, 3, ec48...|[[43, 16, 94, 93,...|
| [seattle, 1]|[seattle, 1, 7d19...|[[70, 22, 45, 74,...|
|[kitty hawk, 3]|[kitty hawk, 3, d...|[[46, 15, 56, 94,...|
You can download a generated .json sample from this link: https://aiaccqualitytelcapture.blob.core.windows.net/streamanalytics/2019/08/21/10/0_43cbc7b0c9e845a187ce182b46eb4a3a_1.json?st=2019-08-22T15%3A20%3A20Z&se=2026-08-23T15%3A20%3A00Z&sp=rl&sv=2018-03-28&sr=b&sig=tsYh4oTNZXWbLnEgYypNqIsXH3BXOG8XyAH5ODi8iQg%3D
In particular, you can see that the actual data in each of these is actually a dictionary: the "features" column which is the one of interest to us is of this form: {"factory_id":"seattle","line_id":"3"}
I'm attempting to encode the data in features to one_hot via classical functional means.
See below:
def one_hot(value, categories_list):
num_cats = len(categories_list)
one_hot = np.eye(num_cats)[categories_list.index(value)]
return one_hot
def one_hot_features(row, feature_keys, u_features):
"""
feature_keys must be sorted.
"""
cur_key = feature_keys[0]
vector = one_hot(row["features"][cur_key], u_features[cur_key])
for i in range(1, len(feature_keys)):
cur_key = feature_keys[i]
n_vector = one_hot(row["features"][cur_key], u_features[cur_key])
vector = np.concatenate((vector, n_vector), axis=None)
return vector
The feature_keys and u_features in this case contain the following data:
feature_keys = ['factory_id', 'line_id']
u_features = {'factory_id': ['kitty hawk', 'miami', 'nags head', 'seattle'], 'line_id': ['1', '2', '3']}
I have created a udf and am attempting to create a new dataframe with the new column added using this udf. Code below:
def calc_onehot_udf(feature_keys, u_features):
return udf(lambda x: one_hot_features(x, feature_keys, u_features))
n_data = data.withColumn("hot_feature", calc_onehot_udf(feature_keys,
u_features)( col("features") ))
n_data.show()
This results in the following sets of error:
Py4JJavaError: An error occurred while calling o148257.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 91.0 failed 4 times, most recent failure: Lost task 0.3 in stage 91.0 (TID 1404, 10.139.64.5, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/databricks/spark/python/pyspark/sql/types.py", line 1514, in getitem idx = self.fields.index(item) ValueError: 'features' is not in list
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 480, in main process() File "/databricks/spark/python/pyspark/worker.py", line 472, in process serializer.dump_stream(out_iter, outfile) File "/databricks/spark/python/pyspark/serializers.py", line 456, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/databricks/spark/python/pyspark/serializers.py", line 149, in dump_stream for obj in iterator: File "/databricks/spark/python/pyspark/serializers.py", line 445, in _batched for item in iterator: File "", line 1, in File "/databricks/spark/python/pyspark/worker.py", line 87, in return lambda *a: f(*a) File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper return f(*args, **kwargs) File "", line 4, in File "", line 11, in one_hot_features File "/databricks/spark/python/pyspark/sql/types.py", line 1519, in getitem raise ValueError(item) ValueError: features
Any assistance is greatly appreciated. I am actively investigating this.
The ideal output would be a new dataframe with the column: "hot_features" which contains the 1 dimensional one hot encoded array from the features column.
Upvotes: 0
Views: 2141
Reputation: 2730
Turns out that there were a few key problems:
new code below for one_hot_features.
def one_hot_features(features_val, feature_keys, u_features):
cur_key = feature_keys[0]
vector = one_hot(features_val[cur_key], u_features[cur_key])
for i in range(1, len(feature_keys)):
cur_key = feature_keys[i]
n_vector = one_hot(features_val[cur_key], u_features[cur_key])
vector = np.concatenate((vector, n_vector), axis=None)
return vector.tolist()
According to various other documentation I've found it appears that numpy arrays don't play particularly well with spark dataframes as of this writing and therefor it is best to transform them into the more generic python types. This appears to have solved the problem faced here.
updated code for the udf definition below:
def calc_onehot_udf(feature_keys, u_features):
return udf(lambda x: one_hot_features(x, feature_keys, u_features),
ArrayType(FloatType()))
n_data = data.withColumn("hot_feature", calc_onehot_udf(feature_keys,
u_features)(col("features")))
n_data.show()
Good Luck if you face this problem; hopefully documenting here helps.
Upvotes: 0