swinefish
swinefish

Reputation: 561

Spark Streaming: How to get the filename of a processed file in Python

I'm sort of a noob to Spark (and also Python honestly) so please forgive me if I've missed something obvious.

I am doing file streaming with Spark and Python. In the first example I did, Spark correctly listens to the given directory and counts word occurrences in the file, so I know that everything works in terms of listening to the directory.

Now I am trying to get the name of the file that is processed for auditing purposes. I read here http://mail-archives.us.apache.org/mod_mbox/spark-user/201504.mbox/%3CCANvfmP8OC9jrpVgWsRWfqjMxeYd6sE6EojfdyFy_GaJ3BO43_A@mail.gmail.com%3E that this is no trivial task. I got a possible solution here http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3CCAEgyCiZbnrd6Y_aG0cBRCVC1u37X8FERSEcHB=tR3A2VGrGrPQ@mail.gmail.com%3E and I have tried implementing it as follows:

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def fileName(data):
    string = data.toDebugString

if __name__ == "__main__":
    sc = SparkContext(appName="PythonStreamingFileNamePrinter")
    ssc = StreamingContext(sc, 1)
    lines = ssc.textFileStream("file:///test/input/")
    files = lines.foreachRDD(fileName)
    print(files)
    ssc.start()
    ssc.awaitTermination()

Unfortunately, now rather than listening at the folder every second, it listens once, outputs 'None' and then just waits doing nothing. The only difference between this and the code that did work is the

files = lines.foreachRDD(fileName)

Before I even worry about getting the filename (tomorrow's problems) can anybody see why this is only checking the directory once?

Thanks in advance M

Upvotes: 3

Views: 2699

Answers (2)

ben othman zied
ben othman zied

Reputation: 310

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def get_file_info(rdd):
    file_content = rdd.collect()
    file_name = rdd.toDebugString()
    print(file_name, file_content)


def main():
    sc = SparkContext("local[2]", "deneme")
    ssc = StreamingContext(sc, 1)  # One DSTREAM in the same time

    lines = ssc.textFileStream('../urne')
    # here is the call
    lines.foreachRDD(lambda rdd: get_file_info(rdd))

    # Split each line into words
    words = lines.flatMap(lambda line: line.split("\n"))

    # Count each word in each batch
    pairs = words.map(lambda word: (word, 1))

    wordCounts = pairs.reduceByKey(lambda x, y: x + y)

    wordCounts.pprint()

    ssc.start()
   
    ssc.awaitTermination()
   

if __name__ == "__main__":
    main()

Then, when you get some result like this: b'(3) MapPartitionsRDD[237] at textFileStream at NativeMethodAccessorImpl.java:0 []\n | UnionRDD[236] at textFileStream at NativeMethodAccessorImpl.java:0 []\n | file:/some/directory/file0.068513 NewHadoopRDD[231] at textFileStream at NativeMethodAccessorImpl.java:0 []\n | file:/some/directory/file0.069317 NewHadoopRDD[233] at textFileStream at NativeMethodAccessorImpl.java:0 []\n | file:/some/directory/file0.070036 NewHadoopRDD[235] at textFileStream at NativeMethodAccessorImpl.java:0 []' ['6', '3', '4', '3', '6', '0', '1', '7', '10', '2', '0', '0', '1', '1', '10', '8', '7', '7', '0', '8', '8', '9', '7', '2', '9', '1', '5', '8', '9', '9', '0', '6', '0', '4', '3', '4', '8', '5', '8', '10', '5', '2', '3', '6', '10', '2', '1', '0', '4', '3', '1', '8', '2', '10', '4', '0', '4', '4', '1', '4', '3', '1', '2', '5', '5', '3', ]

Make a regex to get content of the files and their names, spark mark to you that it has 3 files as one DSTREM, so from there you can work

Upvotes: 0

swinefish
swinefish

Reputation: 561

So it was a noob error. I'm posting my solution for reference for myself and others.

As pointed out by @user3689574, I was not returning the debug string in my function. This fully explains why I was getting the 'None'.

Next, I was printing the debug outside of the function, meaning it was never part of the foreachRDD. Moving it into the function as follows:

def fileName(data):
    debug = data.toDebugString()
    print(debug)

This prints the debug information as it should, and continues to listen to the directory, as it should. Changing that fixed my initial problem. In terms of getting the file name, that has become pretty straightforward.

The debug string when there is no change in the directory is as follows:

(0) MapPartitionsRDD[1] at textFileStream at NativeMethodAccessorImpl.java:-2 [] | UnionRDD[0] at textFileStream at NativeMethodAccessorImpl.java:-2 []

Which neatly indicates that there is no file. When a file is copied into the directory, the debug output is as follows:

(1) MapPartitionsRDD[42] at textFileStream at NativeMethodAccessorImpl.java:-2 [] | UnionRDD[41] at testFileStream at NativeMethodAccessorImpl.java:-2 [] | file:/test/input/test.txt New HadoopRDD[40] at textFileStream at NativeMethodAccessorImpl.java:-2 []

Which, with a quick regex, gives you the file name with little trouble. Hope this helps somebody else.

Upvotes: 3

Related Questions