Reputation: 24508
the spark file prints out top10 words in a txt file.
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
conf = SparkConf().setAppName("read text file in pyspark")
sc = SparkContext(conf=conf)
path_to_file = "C:/Users/Admin/Desktop/beeline_project/lorem2.txt"
path_to_save_result = "C:/Users/Admin/Desktop/beeline_project/output/"
words = sc.textFile(path_to_file).flatMap(lambda line: line.split(" "))
wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b:a +b)
output = wordCounts.collect()
output = sorted(output, key = lambda tup: tup[1], reverse=True)
output = output[:10]
for (word, count) in output:
print("%s: %i" % (word, count))
sc.stop()
but then it gives this error, i just want the program to stop correctly. It says in the end "no connection made - machine refused it", but i did not make connections anywhere.
It could be because of running pyspark without main()? Does it affect the output?
SUCCESS: The process with PID 13904 (child process of PID 6048) has been terminated.
SUCCESS: The process with PID 6048 (child process of PID 13948) has been terminated.
SUCCESS: The process with PID 13948 (child process of PID 8892) has been terminated.
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "C:\python37\lib\site-packages\py4j\java_gateway.py", line 1152, in send_command
answer = smart_decode(self.stream.readline()[:-1])
File "C:\python37\lib\socket.py", line 589, in readinto
return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\python37\lib\site-packages\py4j\java_gateway.py", line 985, in send_command
response = connection.send_command(command)
File "C:\python37\lib\site-packages\py4j\java_gateway.py", line 1164, in send_command
"Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:62315)
Traceback (most recent call last):
File "C:\python37\lib\site-packages\py4j\java_gateway.py", line 929, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\python37\lib\site-packages\py4j\java_gateway.py", line 1067, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:62315)
Traceback (most recent call last):
File "C:\python37\lib\site-packages\py4j\java_gateway.py", line 929, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\python37\lib\site-packages\py4j\java_gateway.py", line 1067, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it
EDIT:
using spark2.4.6, scala 2.11.12, java8,python3.7
Upvotes: 0
Views: 5978
Reputation: 19328
Try this:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
path_to_file = "C:/Users/Admin/Desktop/beeline_project/lorem2.txt"
path_to_save_result = "C:/Users/Admin/Desktop/beeline_project/output/"
df = spark.read.text(path_to_file)
words = df.rdd.flatMap(lambda row: row[0].split(" ")).collect()
wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b:a +b)
output = wordCounts.collect()
output = sorted(output, key = lambda tup: tup[1], reverse=True)
output = output[:10]
for (word, count) in output:
print("%s: %i" % (word, count))
Note that this approach only uses Spark to read a file and convert it to a list of strings. Collecting all data to the driver node is slow and will error out if the dataset is large. This code might work, but it's definitely not how you'd perform the analysis "the Spark way". You should look into performing this analysis with Spark DataFrames / the native Spark functions, so it could be executed on multiple nodes of a cluster in parallel and harness the power of the Spark engine.
Upvotes: 1