Dan
Dan

Reputation: 4319

Spark: Load multiple files, analyze individually, merge results, and save

I'm new to Spark and not quite how to ask this (which terms to use, etc.), so here's a picture of what I'm conceptually trying to accomplish:

Conceptual need diagram

I have lots of small, individual .txt "ledger" files (e.g., line-delimited files with a timestamp and attribute values at that time).

I'd like to:

  1. Read each "ledger" file into individual data frames (read: NOT combining into one, big data frame);

  2. Perform some basic calculations on each individual data frame, which result in a row of new data values; and then

  3. Merge all the individual result rows into a final object & save it to disk in a line-delimited file.

It seems like nearly every answer I find (when googling related terms) is about loading multiple files into a single RDD or DataFrame, but I did find this Scala code:

val data = sc.wholeTextFiles("HDFS_PATH")
val files = data.map { case (filename, content) => filename}
def doSomething(file: String) = { 
println (file);

 // your logic of processing a single file comes here

 val logData = sc.textFile(file);
 val numAs = logData.filter(line => line.contains("a")).count();
 println("Lines with a: %s".format(numAs));

 // save rdd of single file processed data to hdfs comes here
}

files.collect.foreach( filename => {
    doSomething(filename)
})

... but:

A. I can't tell if this parallelizes the read/analyze operation, and

B. I don't think it provides for merging the results into a single object.

Any direction or recommendations are greatly appreciated!

Update

It seems like what I'm trying to do (run a script on multiple files in parallel and then combine results) might require something like thread pools (?).

For clarity, here's an example of the calculation I'd like to perform on the DataFrame created by reading in the "ledger" file:

from dateutil.relativedelta import relativedelta
from datetime import datetime
from pyspark.sql.functions import to_timestamp

# Read "ledger file"
df = spark.read.json("/path/to/ledger-filename.txt")

# Convert string ==> timestamp & sort
df = (df.withColumn("timestamp", to_timestamp(df.timestamp, 'yyyy-MM-dd HH:mm:ss'))).sort('timestamp')

columns_with_age = ("location", "status")
columns_without_age = ("wh_id")

# Get the most-recent values (from the last row of the df)
row_count = df.count()
last_row = df.collect()[row_count-1]

# Create an empty "final row" dictionary
final_row = {}

# For each column for which we want to calculate an age value ...
for c in columns_with_age:

    # Initialize loop values
    target_value = last_row.__getitem__(c)
    final_row[c] = target_value
    timestamp_at_lookback = last_row.__getitem__("timestamp")
    look_back = 1
    different = False

    while not different:
        previous_row = df.collect()[row_count - 1 - look_back]
        if previous_row.__getitem__(c) == target_value:
            timestamp_at_lookback = previous_row.__getitem__("timestamp")
            look_back += 1

        else:
            different = True

    # At this point, a difference has been found, so calculate the age
    final_row["days_in_{}".format(c)] = relativedelta(datetime.now(), timestamp_at_lookback).days

Thus, a ledger like this:

+---------+------+-------------------+-----+
| location|status|          timestamp|wh_id|
+---------+------+-------------------+-----+
|  PUTAWAY|     I|2019-04-01 03:14:00|   20|
|PICKABLE1|     X|2019-04-01 04:24:00|   20|
|PICKABLE2|     X|2019-04-01 05:33:00|   20|
|PICKABLE2|     A|2019-04-01 06:42:00|   20|
|  HOTPICK|     A|2019-04-10 05:51:00|   20|
| ICEXCEPT|     A|2019-04-10 07:04:00|   20|
| ICEXCEPT|     X|2019-04-11 09:28:00|   20|
+---------+------+-------------------+-----+

Would reduce to (assuming the calculation was run on 2019-04-14):

{ '_id': 'ledger-filename', 'location': 'ICEXCEPT', 'days_in_location': 4, 'status': 'X', 'days_in_status': 3, 'wh_id': 20 }

Upvotes: 1

Views: 5006

Answers (2)

mikeL
mikeL

Reputation: 1114

You could fetch the file paths in hdfs

import  org.apache.hadoop.fs.{FileSystem,Path}

val files=FileSystem.get( sc.hadoopConfiguration ).listStatus( new Path(your_path)).map( x => x.getPath ).map(x=> "hdfs://"+x.toUri().getRawPath())

create a unique dataframe for each path

val arr_df= files.map(spark.read.format("csv").option("delimeter", ",").option("header", true).load(_))

The apply your filter or any transformation before unioning to one dataframe

val df= arr_df.map(x=> x.where(your_filter)).reduce(_ union _)

Upvotes: 0

ollik1
ollik1

Reputation: 4540

Using wholeTextFiles is not recommended as it loads the full file into memory at once. If you really want to create an individual data frame per file, you can simply use the full path instead of a directory. However, this is not recommended and will most likely lead to poor resource utilisation. Instead, consider using input_file_path https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/functions.html#input_file_name--

For example:

spark
.read
  .textFile("path/to/files")
  .withColumn("file", input_file_name())
  .filter($"value" like "%a%")
  .groupBy($"file")
  .agg(count($"value"))
  .show(10, false)
+----------------------------+------------+
|file                        |count(value)|
+----------------------------+------------+
|path/to/files/1.txt         |2           |
|path/to/files/2.txt         |4           |
+----------------------------+------------+

so the files can be processed individually and then later combined.

Upvotes: 2

Related Questions