Fanooos
Fanooos

Reputation: 2818

Spark streaming not reading from local directory

I am trying to write a spark streaming application using Spark Python API.

The application should read text files from local directory and send it to Kafka cluster.

When submitting the python script to spark engine, nothing sent to kafka at all.

I tried to print the events instead of send it to Kafka and found that there is nothing read.

Here is the code of the script.

#!/usr/lib/python
# -*- coding: utf-8 -*-

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from kafka import KafkaProducer
import sys 
import time
reload(sys)
sys.setdefaultencoding('utf8')


producer = KafkaProducer(bootstrap_servers="kafka-b01.css.org:9092,kafka-b02.css.org:9092,kafka-b03.css.org:9092,kafka-b04.css.org:9092,kafka-b05.css.org:9092")


def send_to_kafka(rdd):
    tweets = rdd.collect()
    print ("--------------------------")
    print (tweets)
    print "--------------------------"
    #for tweet in tweets:
    #    producer.send('test_historical_job', value=bytes(tweet))


if __name__ == "__main__":

    conf = SparkConf().setAppName("TestSparkFromPython")

    sc = SparkContext(conf=conf)

    ssc = StreamingContext(sc, 1)

    tweetsDstream = ssc.textFileStream("/tmp/historical/")

    tweetsDstream.foreachRDD(lambda rdd: send_to_kafka(rdd))
    ssc.start()
    ssc.awaitTermination()

I am submitting the script using this command

./spark-submit --master spark://spark-master:7077 /apps/historical_streamer.py

The output of the print statement is an empty list.

--------------------------
[]
--------------------------

EDIT

based on this question I changed the path of the data directory from "/tmp/historical/" to "file:///tmp/historical/".

I tried to run the job first and then move files to the directory but unfortunately it did not work also.

Upvotes: 1

Views: 1949

Answers (2)

zero323
zero323

Reputation: 330063

File stream based sources like fileStream or textFileStream expect data files to be:

be created in the dataDirectory by atomically moving or renaming them into the data directory.

If there are no new files in a given window there is nothing to proces so pre-existing files (it seems to be the case here) won't be read on won't show on the output.

Upvotes: 1

salvob
salvob

Reputation: 1370

Your function:

def send_to_kafka(rdd):
tweets = rdd.collect()
print ("--------------------------")
print (tweets)
print "--------------------------"
#for tweet in tweets:
#    producer.send('test_historical_job', value=bytes(tweet))

will collect all the rdd, but it won't print the content of the rdd. To do so, you should use the routine:

tweets.foreach(println)

that will, for every element in the RDD, give as output the elements. As explained in the Spark Documentation

Hope this will help

Upvotes: 0

Related Questions