Reputation: 304
I'm just trying to run sample code of statefu lstreaming, but it fails with error. Can not get why it occures.
Spark 2.3 with 3.6 python on cloudera vm 5.13.3
Running options:
--master local[*] --queue PyCharmSpark pyspark-shell
My code is:
from pyspark import SparkConf, SQLContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import *
conf = (SparkConf()
.setAppName("ch2_dstreams_t1.py"))
spark = SparkSession.builder \
.appName(" ") \
.config(conf=conf) \
.getOrCreate()
# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = spark.sparkContext
ssc = StreamingContext(sc, 10)
ssc.checkpoint("checkpoint")
# define the update function
def updatetotalcount(currentcount, countstate):
if countstate is None:
countstate = 0
return sum(currentcount, countstate)
# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 7777)
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))
# Count each word in each batch
pairs = words.map(lambda word: (word, 1)) \
wordCounts = pairs.reduceByKey(lambda x, y: x + y)\
totalCounts = wordCounts.updateStateByKey(updatetotalcount)
totalCounts.pprint()
# Start the computation
ssc.start()
# Wait for the computation to terminate
ssc.awaitTermination()
Stream is working and listening socket, but when I try to input lines in terminal application fails with
an error:
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
-------------------------------------------
Time: 2018-08-27 11:40:10
-------------------------------------------
[Stage 0:> (0 + 1) / 1]18/08/27 11:40:15 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:15 WARN storage.BlockManager: Block input-0-1535395215600 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:16 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:16 WARN storage.BlockManager: Block input-0-1535395215800 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:16 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:16 WARN storage.BlockManager: Block input-0-1535395216000 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:16 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:16 WARN storage.BlockManager: Block input-0-1535395216200 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:16 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:16 WARN storage.BlockManager: Block input-0-1535395216400 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:20 WARN storage.BlockManager: Putting block rdd_30_0 failed due to exception org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
process()
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 1979, in <lambda>
File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 594, in <lambda>
File "/home/cloudera/PycharmProjects/spark_streaming/ch2_dstreams_t1.py", line 27, in updatetotalcount
return sum(currentcount, countstate)
TypeError: _() takes 1 positional argument but 2 were given
.
18/08/27 11:40:20 WARN storage.BlockManager: Block rdd_30_0 could not be removed as it was not found on disk or in memory
18/08/27 11:40:20 ERROR executor.Executor: Exception in task 0.0 in stage 8.0 (TID 22)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
process()
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 1979, in <lambda>
File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 594, in <lambda>
File "/home/cloudera/PycharmProjects/spark_streaming/ch2_dstreams_t1.py", line 27, in updatetotalcount
return sum(currentcount, countstate)
TypeError: _() takes 1 positional argument but 2 were given
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/08/27 11:40:20 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 8.0 (TID 22, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
process()
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 1979, in <lambda>
File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 594, in <lambda>
File "/home/cloudera/PycharmProjects/spark_streaming/ch2_dstreams_t1.py", line 27, in updatetotalcount
return sum(currentcount, countstate)
TypeError: _() takes 1 positional argument but 2 were given
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Probably the root cause is in my function updatetotalcount, when I comment transformation updateStateByKey(updatetotalcount) it prints out results in the output:
File "/home/cloudera/PycharmProjects/spark_streaming/ch2_dstreams_t1.py", line 27, in updatetotalcount
return sum(currentcount, countstate)
TypeError: _() takes 1 positional argument but 2 were given
Pls advice, why I'm getting this error?
Upvotes: 0
Views: 1008
Reputation: 304
The issue was - using
from pyspark.sql.functions import *
that overrides python functions. If you want to use pyspark.sql.functions need to resolve namespaces e.g. as
import pyspark.sql.functions as f
Upvotes: 0