teddy
teddy

Reputation: 423

Write Queries for .csv file in spark-shell

Can anyone tell me how to write queries using spark-shell for .csv file?

What I have achieved was to read a .csv file using databricks library and create a dataframe as shown below:

./spark-shell --packages.com.databricks:spark-csv_2.10:1.4.0
import org.apache.spark.sql.SQLContext
val sqlContect = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv") .option("header", "true").load("mylocalpath.csv")

Then I can do df.printSchema() and other datafram operations without any problem. But I was wondering how can I write some queries?

I saw the instruction on http://spark.apache.org/docs/latest/sql-programming-guide.html and it mentions something about Programmatically Specifying the Schema, I followed its procedure and just to read .csv file insteading of textfile, but when I did val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)), I got an error saying value split is not a memeber of org.apache.spark.sql.Row. How can I fix this problem?

And if there are some easier method to write sql queries, please let me know. What I want to do ultimately is something like select two columns, one for id, one for price and returen the highest price as simple as that.

df.printSchema() looks like this:

|-- TAXROLL_NUMBER: string (nullable = true)
|-- BUILDING_NAME: string (nullable = true)

|-- ASSESSED_VALUE: string (nullable = true)

|-- STREET_NAME: string (nullable = true)

|-- POSTAL_CODE: string (nullable = true)

|-- CITY: string (nullable = true)

|-- BUILD_YEAR: string (nullable = true)

|-- Lon: string (nullable = true)

|-- Lat: string (nullable = true)

Upvotes: 1

Views: 1800

Answers (2)

Sree Eedupuganti
Sree Eedupuganti

Reputation: 440

Java Code Spark 2.0.0

    package com.example.SparkReadFile;

    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.functions;


    public class Driver 
    {
        public static void main(String[] args) throws Exception {
            SparkSession spark = SparkSession
                   .builder()
                   .appName("Csv reader")
                   .master("local")
                  // .enableHiveSupport()
                   .getOrCreate();
            Dataset<Row> df = spark.read()
                   .format("csv")
                   .option("header", "true")
                   .option("nullValue", "")
                   .csv("file:///Users/karuturi/Desktop/sample.csv");

            df.registerTempTable("people"); //temp table registration

            Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
            sqlDF.show();
            }
    }

Upvotes: 0

Zahiro Mor
Zahiro Mor

Reputation: 1718

I'm not sure i follow you completely, but maybe all you need is

df.registerTempTable("TblName") //temp table registration

or

df.saveAsTable("TblName") //actual physical table registration

and query with

sqlContext.sql("select * from TblName limit 100").take(100).foreach(println)

or any other spark-sql query.

I think your problem results from trying to do RDD work after reading the CSV with spark-csv package. the type this package returns is as you stated - org.apache.spark.sql.Row. you can use the RDD method easily - just read the csv file with textFile. ex:

case class tmpSchema(TAXROLL_NUMBER: String,  BUILDING_NAME: String, ASSESSED_VALUE: String, STREET_NAME: String, CITY: String) // etc.   
val toTable = sc.textFile(pathString).map(_.split(",")).map(p => tmpSchema(p(0), p(1) ,p(2), p(3), p(4)). toDF.registerTempTable("tblName2")

this method does not require the use of the databricks csv package. on the other hand, if your data has some encapsulations and escaping charachters - you better use the CSV package.

Upvotes: 3

Related Questions