Reputation: 4319
I'm new to Spark and not quite how to ask this (which terms to use, etc.), so here's a picture of what I'm conceptually trying to accomplish:
I have lots of small, individual .txt "ledger" files (e.g., line-delimited files with a timestamp and attribute values at that time).
I'd like to:
Read each "ledger" file into individual data frames (read: NOT combining into one, big data frame);
Perform some basic calculations on each individual data frame, which result in a row of new data values; and then
Merge all the individual result rows into a final object & save it to disk in a line-delimited file.
It seems like nearly every answer I find (when googling related terms) is about loading multiple files into a single RDD or DataFrame, but I did find this Scala code:
val data = sc.wholeTextFiles("HDFS_PATH")
val files = data.map { case (filename, content) => filename}
def doSomething(file: String) = {
println (file);
// your logic of processing a single file comes here
val logData = sc.textFile(file);
val numAs = logData.filter(line => line.contains("a")).count();
println("Lines with a: %s".format(numAs));
// save rdd of single file processed data to hdfs comes here
}
files.collect.foreach( filename => {
doSomething(filename)
})
... but:
A. I can't tell if this parallelizes the read/analyze operation, and
B. I don't think it provides for merging the results into a single object.
Any direction or recommendations are greatly appreciated!
Update
It seems like what I'm trying to do (run a script on multiple files in parallel and then combine results) might require something like thread pools (?).
For clarity, here's an example of the calculation I'd like to perform on the DataFrame created by reading in the "ledger" file:
from dateutil.relativedelta import relativedelta
from datetime import datetime
from pyspark.sql.functions import to_timestamp
# Read "ledger file"
df = spark.read.json("/path/to/ledger-filename.txt")
# Convert string ==> timestamp & sort
df = (df.withColumn("timestamp", to_timestamp(df.timestamp, 'yyyy-MM-dd HH:mm:ss'))).sort('timestamp')
columns_with_age = ("location", "status")
columns_without_age = ("wh_id")
# Get the most-recent values (from the last row of the df)
row_count = df.count()
last_row = df.collect()[row_count-1]
# Create an empty "final row" dictionary
final_row = {}
# For each column for which we want to calculate an age value ...
for c in columns_with_age:
# Initialize loop values
target_value = last_row.__getitem__(c)
final_row[c] = target_value
timestamp_at_lookback = last_row.__getitem__("timestamp")
look_back = 1
different = False
while not different:
previous_row = df.collect()[row_count - 1 - look_back]
if previous_row.__getitem__(c) == target_value:
timestamp_at_lookback = previous_row.__getitem__("timestamp")
look_back += 1
else:
different = True
# At this point, a difference has been found, so calculate the age
final_row["days_in_{}".format(c)] = relativedelta(datetime.now(), timestamp_at_lookback).days
Thus, a ledger like this:
+---------+------+-------------------+-----+
| location|status| timestamp|wh_id|
+---------+------+-------------------+-----+
| PUTAWAY| I|2019-04-01 03:14:00| 20|
|PICKABLE1| X|2019-04-01 04:24:00| 20|
|PICKABLE2| X|2019-04-01 05:33:00| 20|
|PICKABLE2| A|2019-04-01 06:42:00| 20|
| HOTPICK| A|2019-04-10 05:51:00| 20|
| ICEXCEPT| A|2019-04-10 07:04:00| 20|
| ICEXCEPT| X|2019-04-11 09:28:00| 20|
+---------+------+-------------------+-----+
Would reduce to (assuming the calculation was run on 2019-04-14):
{ '_id': 'ledger-filename', 'location': 'ICEXCEPT', 'days_in_location': 4, 'status': 'X', 'days_in_status': 3, 'wh_id': 20 }
Upvotes: 1
Views: 5006
Reputation: 1114
You could fetch the file paths in hdfs
import org.apache.hadoop.fs.{FileSystem,Path}
val files=FileSystem.get( sc.hadoopConfiguration ).listStatus( new Path(your_path)).map( x => x.getPath ).map(x=> "hdfs://"+x.toUri().getRawPath())
create a unique dataframe for each path
val arr_df= files.map(spark.read.format("csv").option("delimeter", ",").option("header", true).load(_))
The apply your filter or any transformation before unioning to one dataframe
val df= arr_df.map(x=> x.where(your_filter)).reduce(_ union _)
Upvotes: 0
Reputation: 4540
Using wholeTextFiles
is not recommended as it loads the full file into memory at once. If you really want to create an individual data frame per file, you can simply use the full path instead of a directory. However, this is not recommended and will most likely lead to poor resource utilisation. Instead, consider using input_file_path
https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/functions.html#input_file_name--
For example:
spark
.read
.textFile("path/to/files")
.withColumn("file", input_file_name())
.filter($"value" like "%a%")
.groupBy($"file")
.agg(count($"value"))
.show(10, false)
+----------------------------+------------+
|file |count(value)|
+----------------------------+------------+
|path/to/files/1.txt |2 |
|path/to/files/2.txt |4 |
+----------------------------+------------+
so the files can be processed individually and then later combined.
Upvotes: 2