Reputation: 185
I'm starting with Python Spark (v 1.6.0) on the Cloudera quickstart docker container. I put a static .txt file (500 mb) in the hdfs under /user/root/access_log.txt with succes.
In pyspark I try to load the file with te following line of python code:
lines = sc.textFile("hdfs://quickstart.cloudera/user/root/access_log.txt")
This gives me no errors. But I discovered that the file isn't loaded entirely. Also ..
lines.max()
Gives not the correct last element of the file while hdfs has actually the correct filesize.
Is this a memory problem? My docker settings are set to 3840 MB. I don't know how to fix this. I look forward to yours answers.
Edit:
I counted the elements in my dataset with:
lines.count()
and to my suprise it was correct! This should mean that my file was loaded correctly. But the question still remains why the .max() statement doesn't return the correct element.
Has this something to do with the different tasks?
Edit 2: A few example lines from the .txt file
10.190.174.142 - - [03/Dec/2011:13:28:10 -0800] "GET /images/filmpics/0000/2229/GOEMON-NUKI-000163.jpg HTTP/1.1" 200 184976
10.190.174.142 - - [03/Dec/2011:13:28:11 -0800] "GET /images/filmmediablock/360/GOEMON-NUKI-000163.jpg HTTP/1.1" 200 60117
10.190.174.142 - - [03/Dec/2011:13:28:10 -0800] "GET /images/filmmediablock/360/Chacha.jpg HTTP/1.1" 200 109379
10.190.174.142 - - [03/Dec/2011:13:28:11 -0800] "GET /images/filmmediablock/360/GOEMON-NUKI-000159.jpg HTTP/1.1" 200 161657
Upvotes: 3
Views: 1132
Reputation: 330453
In general max
shouldn't return the (...) last element. It might in some cases, if format used by log file, enforces lexicographic order and you're lucky with the content, otherwise it just won't gonna happen. Since your data is prefixed with IP address and uses unfriendly (not for example ISO 8601) timestamp format, getting the last element is not something you can expect.
One way to find the last element is to include indices:
from operator import itemgetter
(rdd
.zipWithIndex() # Add line number to get (line, no)
.max(key=itemgetter(1))[0]) # Compare elements using index
A bit different approach is to find the last element for each partition and then the last one from these.
from functools import reduce
rdd.mapPartitions(lambda part: reduce(lambda _, x: [x], part, [])).collect()[-1]
or if number of partitions is large:
(rdd
.mapPartitionsWithIndex(
lambda i, part: reduce(lambda _, x: [(i, x)], part, []))
.max()[1]) # Take max using tuple ordering
Upvotes: 3