Best practices with regard the read of vast amounts of files in apache spark

I want to deal with a vast amount of .DAT files which contain measurements per second (each line represents a second) from an electrical device.

I have more than 5000 files each of which is about 160 KiB (not that much actually), but I am finding it difficult to find an efficient or recommended way to deal with this kind of problem: create an object that summmarizes each file content.

This is my file structure:

feeder/
     CT40CA18_20190101_000000_60P_40000258.DAT
     CT40CA18_20190101_010000_60P_40000258.DAT
     CT40CA18_20190101_020000_60P_40000258.DAT
     CT40CA18_20190101_030000_60P_40000258.DAT
     CT40CA18_20190101_040000_60P_40000258.DAT
     ....
     ....
     ....
     CT40CA18_20190812_010000_60P_40000258.DAT

My current code in Java Spark (2.1.1 version) is:

public class Playground {

    private static final SparkSession spark = new SparkSession
            .Builder()
            .master("local[*]")
            .getOrCreate();

    public static void main(String[] args) {

        Dataset<FeederFile> feederFileDataset = spark
                .read()
                .textFile("resources/CT40CA18/feeder/*.DAT")
                .map(new ParseFeederFile(), Encoders.bean(FeederFile.class));

    }
}

ParseFeederFile is:

package transformations.map;

import model.FeederFile;
import org.apache.spark.api.java.function.MapFunction;

public class ParseFeederFile implements MapFunction<String, FeederFile> {

    private StringBuilder fileContent;

    public ParseFeederFile() {
        fileContent = new StringBuilder();
    }

    @Override
    public FeederFile call(String s) throws Exception {
        return new FeederFile().withContent(fileContent.append(s).append("\n").toString());
    }
}

and FeederFile

package model;

import java.io.Serializable;

public class FeederFile implements Serializable {

    private String content;

    public FeederFile() {}

    public void setContent(String content) {
        this.content = content;
    }

    public String getContent() {
        return content;
    }

    public FeederFile withContent(final String content) {
        this.content = content;
        return this;
    }
}

The problem is that when map invokes call the string that is passed represents a line of a .DAT file. Therefore there is a vast and unnecessary creation of FeederFile objects. Another problem is that textFile does not differentiate between different files so everything is being appended to the same object (i.e, content of all files are in attribute content in FeederFile class)

I managed this naive way for retrieving all the content (I do not want all the content itself, but to create a sort of object that summarises information about the .DAT file, like the lines count, and some statistics based upon the data)

Does any of you come up with an idea on how can I create a FeederFile per .DAT?

Thank you in advance for any help you can provide.

Upvotes: 1

Views: 105

Answers (1)

Vladislav Varslavans
Vladislav Varslavans

Reputation: 2934

You could use:

sparkContext.wholeTextFiles(...)

SparkContext’s whole text files method, i.e., sc.wholeTextFiles in Spark Shell, creates a PairRDD with the key being the file name with a path. It’s a full path like “hdfs://aa1/data/src_data/stage/test_files/collection_vk/current_snapshot/*”. The value is the whole content of file in String.

Upvotes: 1

Related Questions