Reputation: 423
Can anyone tell me how to write queries using spark-shell for .csv file?
What I have achieved was to read a .csv file using databricks library and create a dataframe as shown below:
./spark-shell --packages.com.databricks:spark-csv_2.10:1.4.0
import org.apache.spark.sql.SQLContext
val sqlContect = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv") .option("header", "true").load("mylocalpath.csv")
Then I can do df.printSchema() and other datafram operations without any problem. But I was wondering how can I write some queries?
I saw the instruction on http://spark.apache.org/docs/latest/sql-programming-guide.html and it mentions something about Programmatically Specifying the Schema, I followed its procedure and just to read .csv file insteading of textfile, but when I did val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
, I got an error saying value split is not a memeber of org.apache.spark.sql.Row. How can I fix this problem?
And if there are some easier method to write sql queries, please let me know. What I want to do ultimately is something like select two columns, one for id, one for price and returen the highest price as simple as that.
df.printSchema() looks like this:
|-- TAXROLL_NUMBER: string (nullable = true)
|-- BUILDING_NAME: string (nullable = true)
|-- ASSESSED_VALUE: string (nullable = true)
|-- STREET_NAME: string (nullable = true)
|-- POSTAL_CODE: string (nullable = true)
|-- CITY: string (nullable = true)
|-- BUILD_YEAR: string (nullable = true)
|-- Lon: string (nullable = true)
|-- Lat: string (nullable = true)
Upvotes: 1
Views: 1800
Reputation: 440
Java Code Spark 2.0.0
package com.example.SparkReadFile;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
public class Driver
{
public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession
.builder()
.appName("Csv reader")
.master("local")
// .enableHiveSupport()
.getOrCreate();
Dataset<Row> df = spark.read()
.format("csv")
.option("header", "true")
.option("nullValue", "")
.csv("file:///Users/karuturi/Desktop/sample.csv");
df.registerTempTable("people"); //temp table registration
Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
}
}
Upvotes: 0
Reputation: 1718
I'm not sure i follow you completely, but maybe all you need is
df.registerTempTable("TblName") //temp table registration
or
df.saveAsTable("TblName") //actual physical table registration
and query with
sqlContext.sql("select * from TblName limit 100").take(100).foreach(println)
or any other spark-sql query.
I think your problem results from trying to do RDD work after reading the CSV with spark-csv package. the type this package returns is as you stated - org.apache.spark.sql.Row. you can use the RDD method easily - just read the csv file with textFile. ex:
case class tmpSchema(TAXROLL_NUMBER: String, BUILDING_NAME: String, ASSESSED_VALUE: String, STREET_NAME: String, CITY: String) // etc.
val toTable = sc.textFile(pathString).map(_.split(",")).map(p => tmpSchema(p(0), p(1) ,p(2), p(3), p(4)). toDF.registerTempTable("tblName2")
this method does not require the use of the databricks csv package. on the other hand, if your data has some encapsulations and escaping charachters - you better use the CSV package.
Upvotes: 3