Kamil
Kamil

Reputation: 149

Python Spark Streaming example with textFileStream does not work. Why?

I use spark 1.3.1 and Python 2.7

It is my first experience with Spark Streaming.

I try example of code, which reads data from file using spark streaming.

This is link to example: https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/hdfs_wordcount.py

My code is the following:

conf = (SparkConf()
     .setMaster("local")
     .setAppName("My app")
     .set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 1)
lines = ssc.textFileStream('../inputs/2.txt')
counts = lines.flatMap(lambda line: line.split(" "))\
          .map(lambda x: (x, 1))\
          .reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()

content of 2.txt file is following:

a1 b1 c1 d1 e1 f1 g1
a2 b2 c2 d2 e2 f2 g2
a3 b3 c3 d3 e3 f3 g3

I expect that something related to file content will be in console, but there are nothing. Nothing except text like this each second:

-------------------------------------------
Time: 2015-09-03 15:08:18
-------------------------------------------

and Spark's logs.

Do I do some thing wrong? Otherwise why it does not work?

Upvotes: 3

Views: 7521

Answers (4)

lokesh
lokesh

Reputation: 37

Json Data:

{"timestamp": "1571053218000","t1": "55.23","t2": "10","t3": "ON"}

{"timestamp": "1571053278000","t1": "63.23","t2": "11","t3": "OFF"}

{"timestamp": "1571053338000","t1": "73.23","t2": "12","t3": "ON"}

{"timestamp": "1571053398000","t1": "83.23","t2": "13","t3": "ON"}

Pyspark Code to read from above json data:

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.types import IntegerType, LongType, DecimalType,StructType, StructField, StringType
from pyspark.sql import Row
from pyspark.sql.functions import col
import pyspark.sql.functions as F
from pyspark.sql import Window

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
ssc = StreamingContext(sc, 5)

stream_data = ssc.textFileStream("/filepath/")


def readMyStream(rdd):
  if not rdd.isEmpty():
    df = spark.read.json(rdd)
    print('Started the Process')
    print('Selection of Columns')
    df = df.select('t1','t2','t3','timestamp').where(col("timestamp").isNotNull())
    df.show()


stream_data.foreachRDD( lambda rdd: readMyStream(rdd) )
ssc.start()
ssc.stop()

Upvotes: 1

Jubin Thomas
Jubin Thomas

Reputation: 1

If you are using jupyter notebook to execute this issue, you need to run the program in the batch layer and then upload the text file to the assigned document using jupyter.

Upvotes: 0

Abhi
Abhi

Reputation: 1255

I faced similar issue but what I realized is that once I set the Streaming running, streamingcontext picks up the data from new files. It only ingests data newly placed in the source directory once the streaming is up.

Actually, pyspark document makes it very explicit:

textFileStream(directory)

Create an input stream that monitors a Hadoop-compatible file system for new files and reads them as text files. Files must be wrriten to the monitored directory by “moving” them from another location within the same file system. File names starting with . are ignored.

Upvotes: 3

Kamil
Kamil

Reputation: 149

I find the problem!

I guess the problem was in file system behaviour. I use mac.

My program did not see file if I just copy it. My program saw the file, but it was empty, when I create file in this folder and after that enter data.

Finally my program see file and anything inside if I create file and copy it to scanned directory and do it in period of time, when directory was not scanned.

Also in code in the question text I scanned file, but I should scan directory.

Upvotes: 2

Related Questions