Reputation: 1995
I am new on Apache SPARK and MLlib and I am trying to build a model to make a supervised classification over a dataset. The problema that I have is that all the examples I found in the internet are explanations of how to work with REGRESSION problems, and always with numerical values. But I have the next context:
I have installed HDP distribution (Hortonworks) and I am working from ZEPPELIN with a pyspark interpreter.
I have a dataframe with some attributes which are 'double' and 'string' types; and the label I want to predict is a string ('yes' or 'no'). I show you what I did by the moment:
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.mllib.regression import LabeledPoint
# I get the data
sqlContext = sqlc
df = sqlContext.sql("SELECT string1, double1, string2, double2, label_to_predict FROM HIVE_TABLE")
temp = df.map(lambda line:LabeledPoint(line[0],[line[1:]]))
temp.take(5)
Here I get an error:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 19, dlladatanaly02.orona.es): org.apache.spark.api.python. PythonException: Traceback (most recent call last):
File "/usr/hdp/current/spark-client/python/pyspark/worker.py", line 111, in main
process()
File "/usr/hdp/current/spark-client/python/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/hdp/current/spark-client/python/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/usr/hdp/current/spark-client/python/pyspark/rdd.py", line 1293, in takeUpToNumLeft
yield next(iterator)
File "<string>", line 9, in <lambda>
File "/usr/hdp/current/spark-client/python/pyspark/mllib/regression.py", line 52, in __init__
self.features = _convert_to_vector(features)
File "/usr/hdp/current/spark-client/python/pyspark/mllib/linalg/__init__.py", line 71, in _convert_to_vector
return DenseVector(l)
File "/usr/hdp/current/spark-client/python/pyspark/mllib/linalg/__init__.py", line 274, in __init__
ar = np.array(ar, dtype=np.float64)
ValueError: setting an array element with a sequence.
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1882)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/hdp/current/spark-client/python/pyspark/worker.py", line 111, in main
process()
File "/usr/hdp/current/spark-client/python/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/hdp/current/spark-client/python/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/usr/hdp/current/spark-client/python/pyspark/rdd.py", line 1293, in takeUpToNumLeft
yield next(iterator)
File "<string>", line 9, in <lambda>
File "/usr/hdp/current/spark-client/python/pyspark/mllib/regression.py", line 52, in __init__
self.features = _convert_to_vector(features)
File "/usr/hdp/current/spark-client/python/pyspark/mllib/linalg/__init__.py", line 71, in _convert_to_vector
return DenseVector(l)
File "/usr/hdp/current/spark-client/python/pyspark/mllib/linalg/__init__.py", line 274, in __init__
ar = np.array(ar, dtype=np.float64)
ValueError: setting an array element with a sequence.
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
(<class 'py4j.protocol.Py4JJavaError'>, Py4JJavaError(u'An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.\n', JavaObject id=o287), <traceback object at 0x13cc098>)
I suposed that the values must be numerical, and I tried this casting:
temp = df.map(lambda line:LabeledPoint(float(line[0]),[float(line[1:])]))
But then I get this error:
TypeError: float() argument must be a string or a number
So, my question is:
If I want to make a classification to predict a string value (or nominal value) with attributes that are numerical and string, how could I do it? (Supose I want to use, for example, RandomForest or SVM model).
Upvotes: 0
Views: 785
Reputation:
You cannot. All variables have to be index and or encoded to be used with Spark ML/MLib.
Check:
Upvotes: 0