Reputation: 371
I have the following class which loads a headerless CSV file using the Spark data API.
The problem I have is that I cannot get the SparkSession to accept a schema StructType which should define each column. Resulting Dataframe is unamed columns of String type
public class CsvReader implements java.io.Serializable {
public CsvReader(StructType builder) {
this.builder = builder;
}
private StructType builder;
SparkConf conf = new SparkConf().setAppName("csvParquet").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);
Dataset<Row> df = sparkSession
.read()
.format("com.databricks.spark.csv")
.option("header", false)
//.option("inferSchema", true)
.schema(builder)
.load("/Users/Chris/Desktop/Meter_Geocode_Data.csv"); //TODO: CMD line arg
public void printSchema() {
System.out.println(builder.length());
df.printSchema();
}
public void printData() {
df.show();
}
public void printMeters() {
df.select("meter").show();
}
public void printMeterCountByGeocode_result() {
df.groupBy("geocode_result").count().show();
}
public Dataset getDataframe() {
return df;
}
}
Resulting dataframe schema is:
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)
|-- _c4: string (nullable = true)
|-- _c5: string (nullable = true)
|-- _c6: string (nullable = true)
|-- _c7: string (nullable = true)
|-- _c8: string (nullable = true)
|-- _c9: string (nullable = true)
|-- _c10: string (nullable = true)
|-- _c11: string (nullable = true)
|-- _c12: string (nullable = true)
|-- _c13: string (nullable = true)
Debugger shows that the 'builder' StrucType is correctly defined:
0 = {StructField@4904} "StructField(geocode_result,DoubleType,false)"
1 = {StructField@4905} "StructField(meter,StringType,false)"
2 = {StructField@4906} "StructField(orig_easting,StringType,false)"
3 = {StructField@4907} "StructField(orig_northing,StringType,false)"
4 = {StructField@4908} "StructField(temetra_easting,StringType,false)"
5 = {StructField@4909} "StructField(temetra_northing,StringType,false)"
6 = {StructField@4910} "StructField(orig_address,StringType,false)"
7 = {StructField@4911} "StructField(orig_postcode,StringType,false)"
8 = {StructField@4912} "StructField(postcode_easting,StringType,false)"
9 = {StructField@4913} "StructField(postcode_northing,StringType,false)"
10 = {StructField@4914} "StructField(distance_calc_method,StringType,false)"
11 = {StructField@4915} "StructField(distance,StringType,false)"
12 = {StructField@4916} "StructField(geocoded_address,StringType,false)"
13 = {StructField@4917} "StructField(geocoded_postcode,StringType,false)"
What am I doing wrong? Any help massively appreciated!
Upvotes: 1
Views: 692
Reputation: 15297
Define variable Dataset<Row> df
and move the code block for reading CSV file inside getDataframe()
method like below.
private Dataset<Row> df = null;
public Dataset getDataframe() {
df = sparkSession
.read()
.format("com.databricks.spark.csv")
.option("header", false)
//.option("inferSchema", true)
.schema(builder)
.load("src/main/java/resources/test.csv"); //TODO: CMD line arg
return df;
}
Now you can call it like below.
CsvReader cr = new CsvReader(schema);
Dataset df = cr.getDataframe();
cr.printSchema();
I would suggest you to redesign your class. One option is you can pass df, to other methods, as parameter. If you are using Spark 2.0 then you don't need SparkConf. Please refer documentation to create SparkSession.
Upvotes: 3
Reputation: 155
You should put your df in the constructor function if you want to initialize it by builder.Or you can put it in a member function.
Upvotes: 0