Dipas
Dipas

Reputation: 304

pyspark updateStateByKey fails, when calling my function

I'm just trying to run sample code of statefu lstreaming, but it fails with error. Can not get why it occures.

Spark 2.3 with 3.6 python on cloudera vm 5.13.3

Running options:

--master local[*] --queue PyCharmSpark pyspark-shell 

My code is:

from pyspark import SparkConf, SQLContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import *

conf = (SparkConf()
       .setAppName("ch2_dstreams_t1.py"))

spark = SparkSession.builder \
     .appName(" ") \
     .config(conf=conf) \
     .getOrCreate()


# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = spark.sparkContext
ssc = StreamingContext(sc, 10)
ssc.checkpoint("checkpoint")

# define the update function
def updatetotalcount(currentcount, countstate):
    if countstate is None:
       countstate = 0
    return sum(currentcount, countstate)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 7777)

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1)) \

wordCounts = pairs.reduceByKey(lambda x, y: x + y)\

totalCounts = wordCounts.updateStateByKey(updatetotalcount)

totalCounts.pprint()

# Start the computation
ssc.start()

# Wait for the computation to terminate
ssc.awaitTermination()

Stream is working and listening socket, but when I try to input lines in terminal application fails with

an error:

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
-------------------------------------------
Time: 2018-08-27 11:40:10
-------------------------------------------

[Stage 0:>                                                          (0 + 1) / 1]18/08/27 11:40:15 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:15 WARN storage.BlockManager: Block input-0-1535395215600 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:16 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:16 WARN storage.BlockManager: Block input-0-1535395215800 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:16 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:16 WARN storage.BlockManager: Block input-0-1535395216000 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:16 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:16 WARN storage.BlockManager: Block input-0-1535395216200 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:16 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:16 WARN storage.BlockManager: Block input-0-1535395216400 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:20 WARN storage.BlockManager: Putting block rdd_30_0 failed due to exception org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 1979, in <lambda>
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 594, in <lambda>
  File "/home/cloudera/PycharmProjects/spark_streaming/ch2_dstreams_t1.py", line 27, in updatetotalcount
    return sum(currentcount, countstate)
TypeError: _() takes 1 positional argument but 2 were given
.
18/08/27 11:40:20 WARN storage.BlockManager: Block rdd_30_0 could not be removed as it was not found on disk or in memory
18/08/27 11:40:20 ERROR executor.Executor: Exception in task 0.0 in stage 8.0 (TID 22)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 1979, in <lambda>
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 594, in <lambda>
  File "/home/cloudera/PycharmProjects/spark_streaming/ch2_dstreams_t1.py", line 27, in updatetotalcount
    return sum(currentcount, countstate)
TypeError: _() takes 1 positional argument but 2 were given

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
18/08/27 11:40:20 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 8.0 (TID 22, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 1979, in <lambda>
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 594, in <lambda>
  File "/home/cloudera/PycharmProjects/spark_streaming/ch2_dstreams_t1.py", line 27, in updatetotalcount
    return sum(currentcount, countstate)
TypeError: _() takes 1 positional argument but 2 were given

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Probably the root cause is in my function updatetotalcount, when I comment transformation updateStateByKey(updatetotalcount) it prints out results in the output:

File "/home/cloudera/PycharmProjects/spark_streaming/ch2_dstreams_t1.py", line 27, in updatetotalcount
        return sum(currentcount, countstate)
    TypeError: _() takes 1 positional argument but 2 were given

Pls advice, why I'm getting this error?

Upvotes: 0

Views: 1008

Answers (1)

Dipas
Dipas

Reputation: 304

The issue was - using

from pyspark.sql.functions import *

that overrides python functions. If you want to use pyspark.sql.functions need to resolve namespaces e.g. as

import pyspark.sql.functions as f

Upvotes: 0

Related Questions