Reputation: 1775
I am trying to process the LogFile. first i read the log file and split these file as per my requirement and saved each column into separate JavaRDD. Now i need to convert these JavaRDD's to DataFrames for future operations. This is the code what i tried so far:
SparkConf conf = new SparkConf().setAppName("AuctionBid").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> diskfile = sc.textFile("/Users/karuturi/Downloads/log.txt");
JavaRDD<String> urlrdd=diskfile.flatMap(line -> Arrays.asList(line.split("\t")[0]));
System.out.println(urlrdd.take(1));
SQLContext sql = new SQLContext(sc);
and this is the way how i am trying to convert JavaRDD into DataFrame:
DataFrame fileDF = sqlContext.createDataFrame(urlRDD, Model.class);
But the above line is not working.I confusing about Model.class.
can anyone suggest me.
Thanks.
Upvotes: 13
Views: 35010
Reputation: 12991
You can do something like (I am converting on the fly from scala so excuse any typos):
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
JavaRDD<Row> rowRDD = urlrdd.map(new Function<String, Row>() {
@Override
public Row call(String record) throws Exception {
return RowFactory.create(record());
}
}
// now you wish to create the target schema. This is basically a list of
// fields (each field would be a column) which you are adding to a StructType
List<StructField> fields = new ArrayList<>();
StructField field = DataTypes.createStructField("url", DataTypes.StringType, true);
fields.add(field);
StructType schema = DataTypes.createStructType(fields);
// now you can create the dataframe:
DataFrame df= sqlContext.createDataFrame(rowRDD, schema);
A couple additional notes:
Why are you flatmaping when you are only taking the first element? You could have simply done:
JavaRDD<String> urlrdd=diskfile.flatMap(line -> line.split("\t")[0]);
I assume in real life you would want to remove the '[' from the url (you can easily do this in the map).
If you are moving to spark 2.0 or later then instead of sqlContext you should be using spark session (spark).
You can create a single dataframe with all columns. You can do this by adding all fields to the schema (i.e. instead of just doing a single add to the fields add all of them). Instead of using urlrdd, use diskfile and do the split inside the "public Row call" creation. This would be something like this:
JavaRDD<Row> rowRDD = diskfile.map(new Function<String, Row>() {
@override public Row call(String record) throws Exception {
String[] recs = record.split("\t")
return RowFactory.create(recs[0], recs[1], ...);
}
});
You can create it directly: Just use
sqlContext.read.option("sep","\t").csv.load(filename,schema)
Upvotes: 6
Reputation: 35404
Imports:
import java.io.Serializable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Create a POJO class for URL. I'd recommend you to write for Log line which consists of url, date, time, method, target,.. etc as members
public static class Url implements Serializable {
private String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
Create an RDD of Url objects from a text file
JavaRDD<Url> urlsRDD = spark.read()
.textFile("/Users/karuturi/Downloads/log.txt")
.javaRDD()
.map(new Function<String, Url>() {
@Override
public Url call(String line) throws Exception {
String[] parts = line.split("\\t");
Url url = new Url();
url.setValue(parts[0].replaceAll("[", ""));
return url;
}
});
Create DataFrame from RDD
Dataset<Row> urlsDF = spark.createDataFrame(urlsRDD, Url.class);
RDD to DataFrame - Spark 2.0
RDD to DataFrame - Spark 1.6
Upvotes: 31
Reputation: 2281
Just flatmap your data according to 7 column table and use code snippet below
String[] columns = new String[7] {"clumn1","column2","column3","column4","column5","column6","column7"};
List<String> tableColumns = Arrays.asList(columns);
StrucType schema = createSchema(tableColumns);
public StructType createSchema(List<String> tableColumns){
List<StructField> fields = new ArrayList<StructField>();
for(String column : tableColumns){
fields.add(DataTypes.createStructField(column, DataTypes.StringType, true));
}
return DataTypes.createStructType(fields);
}
sqlContext.createDataFrame(urlRDD, schema);
Upvotes: 4
Reputation: 2294
You can directly read the file using sqlContext directly
Use read method of sqlContext
For more info you can follow this link
https://spark.apache.org/docs/1.6.0/sql-programming-guide.html#creating-dataframes
Or you can import the
import sqlContext.implicits.*;
Then use toDF()
method on rdd to convert into dataframe.
Upvotes: 0