Reputation: 41
When transforming an ML Pipeline which uses VectorAssembler, it is hitting with a "Param handleInvalid does not exist" error. Why does this happen? Am I missing something? I am new to PySpark.
I am using this as per code for combining a given list of columns into a single vector column:
for categoricalCol in categoricalColumns:
stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index').setHandleInvalid("keep")
encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol = 'response', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['date_acct_', 'date_loan_', 'amount', 'duration', 'payments', 'birth_number_', 'min1', 'max1', 'mean1', 'min2', 'max2', 'mean2', 'min3', 'max3', 'mean3', 'min4', 'max4', 'mean4', 'min5', 'max5', 'mean5', 'min6', 'max6', 'mean6', 'gen', 'has_card']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="feature")
print(assembler)
stages += [assembler]
df_features is the main data frame in which I kept all the columns. I have tried to keep there handleInvalid = 'keep' and handleInvalid = 'skip' but unfortunately getting the same error.
Getting the following error:
Traceback (most recent call last):
File "spark_model_exp_.py", line 275, in <module>
feature_df = assembler.transform(features)
File "/usr/local/lib/python3.6/site-packages/pyspark/ml/base.py", line 173, in transform
return self._transform(dataset)
File "/usr/local/lib/python3.6/site-packages/pyspark/ml/wrapper.py", line 311, in _transform
self._transfer_params_to_java()
File "/usr/local/lib/python3.6/site-packages/pyspark/ml/wrapper.py", line 124, in _transfer_params_to_java
pair = self._make_java_param_pair(param, self._paramMap[param])
File "/usr/local/lib/python3.6/site-packages/pyspark/ml/wrapper.py", line 113, in _make_java_param_pair
java_param = self._java_obj.getParam(param.name)
File "/usr/local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/local/lib/python3.6/site-packages/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/local/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o1072.getParam.
: java.util.NoSuchElementException: Param handleInvalid does not exist.
at org.apache.spark.ml.param.Params$$anonfun$getParam$2.apply(params.scala:729)
at org.apache.spark.ml.param.Params$$anonfun$getParam$2.apply(params.scala:729)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.ml.param.Params$class.getParam(params.scala:728)
at org.apache.spark.ml.PipelineStage.getParam(Pipeline.scala:43)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:745)
What did I try before?
categoricalColumns = ['frequency', 'type_disp', 'type_card']
for categoricalCol in categoricalColumns:
stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index').setHandleInvalid("keep")
encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol = 'response', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['date_acct_', 'date_loan_', 'amount', 'duration', 'payments', 'birth_number_', 'min1', 'max1', 'mean1', 'min2', 'max2', 'mean2', 'min3', 'max3', 'mean3', 'min4', 'max4', 'mean4', 'gen', 'has_card']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="feature")
stages += [assembler]
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(features)
features = pipelineModel.transform(features)
features.show(n=2)
selectedCols = ['label', 'feature'] + cols
features = features.select(selectedCols)
print(features.dtypes)
In the above code meaning by using Pipeline also, I am getting the error at transform function of Pipeline. When I tried the above code then I didn't get error at VectorAssembler transform function and getting the same error (Param handleInvalid does not exist) at Pipeline transform function.
Please let me know further details on this. Can we try to achieve this with some other alternative options?
EDIT : I got the partial answer of why does this happening because on local spark version= 2.4 so code is working fine on this but cluster spark version=2.3 and since handleInvalid is introduced from version 2.4 hence I am getting this error.
But I am wondering because I have checked that there is no NULL/NaN values in the dataframe, but then how vectorAssembler is calling handleInvalid param? I am thinking that whether I can bypass this implicit calling of handleInvalid so that I should not face this error or is there any other alternative option rather than upgrading the spark version from 2.3 to 2.4?
Can anyone please suggest on this?
Upvotes: 2
Views: 1408
Reputation: 41
I got the final resolution for solving this from RFormula so that no need to use StringIndexer, VectorAssembler and Pipeline. RFormula will do everything in background. https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/ml/feature/RFormula.html
formula = RFormula(formula='response ~ .', featuresCol='features', labelCol='label')
label_df = formula.fit(df_features).transform(df_features)
where response is label and df_features is your entire set of features.
Upvotes: 2