Reputation: 11
I am very new to spark programming. I am trying to implement a map and reduceByKey to the following data set with 15 fields.
rdd=sc.parallelize([
("West", "Apple", 2.0, 10,2.0, 10,2.0, 10,2.0, 10,2.0, 10,2.0,2.0, 10),
("West", "Apple", 3.0, 10,2.0, 10,2.0, 10,2.0, 10,2.0, 10,2.0, 10,2.0])
This is my map function where i am trying to create a tuple with multiple keys and values
rdd1 = rdd.map(lambda x: ((x[0],x[1]),(x[2],x[3],x[4],x[5],x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14])))
Next step i am trying to reduceByKey (to implement a sql like aggregate on values in the above tuple)
rdd2 = rdd1.reduceByKey(lambda x,y: (x[1]+','+y[1]))
This reduce function works as expected for the tuple index value 0-4, but when i try the tuple index value 5-14, I get the IndexError.
rdd2 = rdd1.reduceByKey(lambda x,y: (x[10]+','+y[10]))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/spark/python/pyspark/rdd.py", line 1277, in take
res = self.context.runJob(self, takeUpToNumLeft, p, True)
File "/opt/spark/python/pyspark/context.py", line 897, in runJob
allowLocal)
File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError15/08/28 01:23:22 WARN TaskSetManager: Lost task 1.0 in stage 78.0 (TID 91, localhost): TaskKilled (killed intentionally)
: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 78.0 failed 1 times, most recent failure: Lost task 0.0 in stage 78.0 (TID 90, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/spark/python/pyspark/rdd.py", line 2330, in pipeline_func
return func(split, prev_func(split, iterator))
File "/opt/spark/python/pyspark/rdd.py", line 2330, in pipeline_func
return func(split, prev_func(split, iterator))
File "/opt/spark/python/pyspark/rdd.py", line 316, in func
return f(iterator)
File "/opt/spark/python/pyspark/rdd.py", line 1758, in combineLocally
merger.mergeValues(iterator)
File "/opt/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 268, in mergeValues
d[k] = comb(d[k], v) if k in d else creator(v)
File "<stdin>", line 1, in <lambda>
IndexError: string index out of range
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:315)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
This looks like a very non trivial error. I am not sure if this error is because of hardware of my machine or my implementation of reduce function or something to do with spark.
Any sort of help is appreciated.
Upvotes: 1
Views: 5379
Reputation: 12234
File "<stdin>", line 1, in <lambda> IndexError: string index out of range
The error occurs in your lambda function. The sequence type (tuple, list, string) that you have doesn't have as many elements as you coded your function to expect.
Upvotes: 2