Mike Evers
Mike Evers

Reputation: 185

Pyspark sc.textFile() doesn't load file completely

I'm starting with Python Spark (v 1.6.0) on the Cloudera quickstart docker container. I put a static .txt file (500 mb) in the hdfs under /user/root/access_log.txt with succes.

In pyspark I try to load the file with te following line of python code:

lines = sc.textFile("hdfs://quickstart.cloudera/user/root/access_log.txt")

This gives me no errors. But I discovered that the file isn't loaded entirely. Also ..

lines.max()

Gives not the correct last element of the file while hdfs has actually the correct filesize.

Is this a memory problem? My docker settings are set to 3840 MB. I don't know how to fix this. I look forward to yours answers.

Edit:

I counted the elements in my dataset with:

lines.count()

and to my suprise it was correct! This should mean that my file was loaded correctly. But the question still remains why the .max() statement doesn't return the correct element.

Has this something to do with the different tasks?

Edit 2: A few example lines from the .txt file

10.190.174.142 - - [03/Dec/2011:13:28:10 -0800] "GET /images/filmpics/0000/2229/GOEMON-NUKI-000163.jpg HTTP/1.1" 200 184976
10.190.174.142 - - [03/Dec/2011:13:28:11 -0800] "GET /images/filmmediablock/360/GOEMON-NUKI-000163.jpg HTTP/1.1" 200 60117
10.190.174.142 - - [03/Dec/2011:13:28:10 -0800] "GET /images/filmmediablock/360/Chacha.jpg HTTP/1.1" 200 109379
10.190.174.142 - - [03/Dec/2011:13:28:11 -0800] "GET /images/filmmediablock/360/GOEMON-NUKI-000159.jpg HTTP/1.1" 200 161657

Upvotes: 3

Views: 1132

Answers (1)

zero323
zero323

Reputation: 330453

In general max shouldn't return the (...) last element. It might in some cases, if format used by log file, enforces lexicographic order and you're lucky with the content, otherwise it just won't gonna happen. Since your data is prefixed with IP address and uses unfriendly (not for example ISO 8601) timestamp format, getting the last element is not something you can expect.

One way to find the last element is to include indices:

from operator import itemgetter

(rdd
    .zipWithIndex()                # Add line number to get (line, no)
    .max(key=itemgetter(1))[0])    # Compare elements using index

A bit different approach is to find the last element for each partition and then the last one from these.

from functools import reduce

rdd.mapPartitions(lambda part: reduce(lambda _, x: [x], part, [])).collect()[-1]

or if number of partitions is large:

(rdd
    .mapPartitionsWithIndex(
        lambda i, part: reduce(lambda _, x: [(i, x)], part, []))
    .max()[1])  # Take max using tuple ordering

Upvotes: 3

Related Questions