chanu raj
chanu raj

Reputation: 43

How to read from textfile(String type data) map and load data into parquet format(multiple columns with different datatype) in Spark scala dynamically

we are importing data from Source RDBMS system to hadoop environment using sqoop as textfile format. And this textfile need to be loaded into hive table of parquet format. How can we approach this scenario without using Hive support(earlier we used beeline insert and we are designing not to use hive anymore) and write directly to HDFS using parquet.

EX:- After sqoop import, lets say we have file under HDFS target dir. /data/loc/mydb/Mytable

data in Mytable and all are of type String.

-----------------------------------------
10|customer1|10.0|2016-09-07  08:38:00.0
20|customer2|20.0|2016-09-08  10:45:00.0
30|customer3|30.0|2016-09-10  03:26:00.0
------------------------------------------

target Hive table schema.

rec_id: int
rec_name: String
rec_value: Decimal(2,1)
rec_created: Timestamp

How can we load data from Mytable to target underlying Hive table location(parquet format) using spark and managing typecasting for all the columns dynamically.

Please Note: we cannot use HiveContext here. Any help in the approach is much appreciated. Thanks in advance.

Upvotes: 3

Views: 1046

Answers (1)

Chema
Chema

Reputation: 2828

The example below read a .csv file as the same format as presented in the question.

There are some details that I would like to explain first.

In the table schema the field: rec_value: Decimal(2,1) would have to be rec_value: Decimal(3,1) for the following reason:

The DECIMAL type represents numbers with fixed precision and scale. When you create a DECIMAL column, you specify the precision, p, and scale, s. Precision is the total number of digits, regardless of the location of the decimal point. Scale is the number of digits after the decimal place. To represent the number 10.0 without a loss of precision, you would need a DECIMAL type with precision of at least 3, and scale of at least 1.

So the Hive table would be:

CREATE TABLE tab_data (
  rec_id INT,
  rec_name STRING,
  rec_value DECIMAL(3,1),
  rec_created TIMESTAMP
) STORED AS PARQUET;

The full scala code

import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{DataTypes, IntegerType, StringType, StructField, StructType, TimestampType}

object CsvToParquet {

  val spark = SparkSession
    .builder()
    .appName("CsvToParquet")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","200") //Change to a more reasonable default number of partitions for our data
    .config("spark.sql.parquet.writeLegacyFormat", true) // To avoid issues with data type between Spark and Hive
                                                         // The convention used by Spark to write Parquet data is configurable.
                                                         // This is determined by the property spark.sql.parquet.writeLegacyFormat
                                                         // The default value is false. If set to "true",
                                                         // Spark will use the same convention as Hive for writing the Parquet data.
    .getOrCreate()

  val sc = spark.sparkContext

  val inputPath = "hdfs://host:port/user/...../..../tab_data.csv"
  val outputPath = "hdfs://host:port/user/hive/warehouse/test.db/tab_data"

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)

    try {

      val DecimalType = DataTypes.createDecimalType(3, 1)

      /**
        * the data schema
        */
      val schema = StructType(List(StructField("rec_id", IntegerType, true), StructField("rec_name",StringType, true),
                   StructField("rec_value",DecimalType),StructField("rec_created",TimestampType, true)))

      /**
        * Reading the data from HDFS as .csv text file
        */
      val data = spark
        .read
        .option("sep","|")
        .option("timestampFormat","yyyy-MM-dd HH:mm:ss.S")
        .option("inferSchema",false)
        .schema(schema)
        .csv(inputPath)

       data.show(truncate = false)
       data.schema.printTreeString()

      /**
        * Writing the data as Parquet file
        */
      data
        .write
        .mode(SaveMode.Append)
        .option("compression", "none") // Assuming no data compression
        .parquet(outputPath)

    } finally {
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    }
  }
}

Input file as .csv tab separated fields

10|customer1|10.0|2016-09-07  08:38:00.0
20|customer2|24.0|2016-09-08  10:45:00.0
30|customer3|35.0|2016-09-10  03:26:00.0
40|customer1|46.0|2016-09-11  08:38:00.0
........

reading from Spark

+------+---------+---------+-------------------+
|rec_id|rec_name |rec_value|rec_created        |
+------+---------+---------+-------------------+
|10    |customer1|10.0     |2016-09-07 08:38:00|
|20    |customer2|24.0     |2016-09-08 10:45:00|
|30    |customer3|35.0     |2016-09-10 03:26:00|
|40    |customer1|46.0     |2016-09-11 08:38:00|
......

schema

root
 |-- rec_id: integer (nullable = true)
 |-- rec_name: string (nullable = true)
 |-- rec_value: decimal(3,1) (nullable = true)
 |-- rec_created: timestamp (nullable = true)

reading from Hive

SELECT *
FROM tab_data;

+------------------+--------------------+---------------------+------------------------+--+
| tab_data.rec_id  | tab_data.rec_name  | tab_data.rec_value  |  tab_data.rec_created  |
+------------------+--------------------+---------------------+------------------------+--+
| 10               | customer1          | 10                  | 2016-09-07 08:38:00.0  |
| 20               | customer2          | 24                  | 2016-09-08 10:45:00.0  |
| 30               | customer3          | 35                  | 2016-09-10 03:26:00.0  |
| 40               | customer1          | 46                  | 2016-09-11 08:38:00.0  |
.....

Hope this helps.

Upvotes: 1

Related Questions