Reputation: 3121
I have a TSV file, where the first line is the header. I want to create a JavaPairRDD from this file. Currently, I'm doing so with the following code:
TsvParser tsvParser = new TsvParser(new TsvParserSettings());
List<String[]> allRows;
List<String> headerRow;
try (BufferedReader reader = new BufferedReader(new FileReader(myFile))) {
allRows = tsvParser.parseAll((reader));
//Removes the header row
headerRow = Arrays.asList(allRows.remove(0));
}
JavaPairRDD<String, MyObject> myObjectRDD = javaSparkContext
.parallelize(allRows)
.mapToPair(row -> new Tuple2<>(row[0], myObjectFromArray(row)));
I was wondering if there was a way to have the javaSparkContext read and process the file directly instead of splitting the operation into two parts.
EDIT: This is not a duplicate of How do I convert csv file to rdd, because I'm looking for an answer in Java, not Scala.
Upvotes: 2
Views: 6791
Reputation: 5546
Apache Spark 2.x have built-in csv reader so you don't have to use https://github.com/databricks/spark-csv
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
*
* @author cpu11453local
*/
public class Main {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.master("local")
.appName("meowingful")
.getOrCreate();
Dataset<Row> df = spark.read()
.option("header", "true")
.option("delimiter","\t")
.csv("hdfs://127.0.0.1:9000/data/meow_data.csv");
df.show();
}
}
And maven file pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.meow.meowingful</groupId>
<artifactId>meowingful</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
</project>
Upvotes: 1
Reputation: 154
use https://github.com/databricks/spark-csv
import org.apache.spark.sql.SQLContext
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.option("delimiter","\t")
.load("cars.csv");
df.select("year", "model").write()
.format("com.databricks.spark.csv")
.option("header", "true")
.save("newcars.csv");
Upvotes: 3
Reputation: 15297
Try below code to read CSV file and create JavaPairRDD.
public class SparkCSVReader {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("CSV Reader");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> allRows = sc.textFile("c:\\temp\\test.csv");//read csv file
String header = allRows.first();//take out header
JavaRDD<String> filteredRows = allRows.filter(row -> !row.equals(header));//filter header
JavaPairRDD<String, MyCSVFile> filteredRowsPairRDD = filteredRows.mapToPair(parseCSVFile);//create pair
filteredRowsPairRDD.foreach(data -> {
System.out.println(data._1() + " ### " + data._2().toString());// print row and object
});
sc.stop();
sc.close();
}
private static PairFunction<String, String, MyCSVFile> parseCSVFile = (row) -> {
String[] fields = row.split(",");
return new Tuple2<String, MyCSVFile>(row, new MyCSVFile(fields[0], fields[1], fields[2]));
};
}
You can also use Databricks spark-csv (https://github.com/databricks/spark-csv). spark-csv is also included in Spark 2.0.0.
Upvotes: 1
Reputation: 6289
I'm the author of uniVocity-parsers and can't help you much with spark, but I believe something like this can work for you:
parserSettings.setHeaderExtractionEnabled(true); //captures the header row
parserSettings.setProcessor(new AbstractRowProcessor(){
@Override
public void rowProcessed(String[] row, ParsingContext context) {
String[] headers = context.headers() //not sure if you need them
JavaPairRDD<String, MyObject> myObjectRDD = javaSparkContext
.mapToPair(row -> new Tuple2<>(row[0], myObjectFromArray(row)));
//process your stuff.
}
});
If you want to paralellize the processing of each row, you can wrap a ConcurrentRowProcessor
:
parserSettings.setProcessor(new ConcurrentRowProcessor(new AbstractRowProcessor(){
@Override
public void rowProcessed(String[] row, ParsingContext context) {
String[] headers = context.headers() //not sure if you need them
JavaPairRDD<String, MyObject> myObjectRDD = javaSparkContext
.mapToPair(row -> new Tuple2<>(row[0], myObjectFromArray(row)));
//process your stuff.
}
}, 1000)); //1000 rows loaded in memory.
Then just call to parse:
new TsvParser(parserSettings).parse(myFile);
Hope this helps!
Upvotes: 0