Reputation: 43
we are importing data from Source RDBMS system to hadoop environment using sqoop as textfile format. And this textfile need to be loaded into hive table of parquet format. How can we approach this scenario without using Hive support(earlier we used beeline insert and we are designing not to use hive anymore) and write directly to HDFS using parquet.
EX:- After sqoop import, lets say we have file under HDFS target dir. /data/loc/mydb/Mytable
data in Mytable and all are of type String.
-----------------------------------------
10|customer1|10.0|2016-09-07 08:38:00.0
20|customer2|20.0|2016-09-08 10:45:00.0
30|customer3|30.0|2016-09-10 03:26:00.0
------------------------------------------
target Hive table schema.
rec_id: int
rec_name: String
rec_value: Decimal(2,1)
rec_created: Timestamp
How can we load data from Mytable to target underlying Hive table location(parquet format) using spark and managing typecasting for all the columns dynamically.
Please Note: we cannot use HiveContext here. Any help in the approach is much appreciated. Thanks in advance.
Upvotes: 3
Views: 1046
Reputation: 2828
The example below read a .csv
file as the same format as presented in the question.
There are some details that I would like to explain first.
In the table schema the field: rec_value: Decimal(2,1)
would have to be rec_value: Decimal(3,1)
for the following reason:
The DECIMAL
type represents numbers with fixed precision
and scale
.
When you create a DECIMAL
column, you specify the precision
, p, and scale
, s.
Precision
is the total number of digits, regardless of the location of the decimal point.
Scale
is the number of digits after the decimal place.
To represent the number 10.0 without a loss of precision, you would need a
DECIMAL
type with precision
of at least 3, and scale
of at least 1.
So the Hive
table would be:
CREATE TABLE tab_data (
rec_id INT,
rec_name STRING,
rec_value DECIMAL(3,1),
rec_created TIMESTAMP
) STORED AS PARQUET;
The full scala code
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{DataTypes, IntegerType, StringType, StructField, StructType, TimestampType}
object CsvToParquet {
val spark = SparkSession
.builder()
.appName("CsvToParquet")
.master("local[*]")
.config("spark.sql.shuffle.partitions","200") //Change to a more reasonable default number of partitions for our data
.config("spark.sql.parquet.writeLegacyFormat", true) // To avoid issues with data type between Spark and Hive
// The convention used by Spark to write Parquet data is configurable.
// This is determined by the property spark.sql.parquet.writeLegacyFormat
// The default value is false. If set to "true",
// Spark will use the same convention as Hive for writing the Parquet data.
.getOrCreate()
val sc = spark.sparkContext
val inputPath = "hdfs://host:port/user/...../..../tab_data.csv"
val outputPath = "hdfs://host:port/user/hive/warehouse/test.db/tab_data"
def main(args: Array[String]): Unit = {
Logger.getRootLogger.setLevel(Level.ERROR)
try {
val DecimalType = DataTypes.createDecimalType(3, 1)
/**
* the data schema
*/
val schema = StructType(List(StructField("rec_id", IntegerType, true), StructField("rec_name",StringType, true),
StructField("rec_value",DecimalType),StructField("rec_created",TimestampType, true)))
/**
* Reading the data from HDFS as .csv text file
*/
val data = spark
.read
.option("sep","|")
.option("timestampFormat","yyyy-MM-dd HH:mm:ss.S")
.option("inferSchema",false)
.schema(schema)
.csv(inputPath)
data.show(truncate = false)
data.schema.printTreeString()
/**
* Writing the data as Parquet file
*/
data
.write
.mode(SaveMode.Append)
.option("compression", "none") // Assuming no data compression
.parquet(outputPath)
} finally {
sc.stop()
println("SparkContext stopped")
spark.stop()
println("SparkSession stopped")
}
}
}
Input file as .csv
tab separated fields
10|customer1|10.0|2016-09-07 08:38:00.0
20|customer2|24.0|2016-09-08 10:45:00.0
30|customer3|35.0|2016-09-10 03:26:00.0
40|customer1|46.0|2016-09-11 08:38:00.0
........
reading from Spark
+------+---------+---------+-------------------+
|rec_id|rec_name |rec_value|rec_created |
+------+---------+---------+-------------------+
|10 |customer1|10.0 |2016-09-07 08:38:00|
|20 |customer2|24.0 |2016-09-08 10:45:00|
|30 |customer3|35.0 |2016-09-10 03:26:00|
|40 |customer1|46.0 |2016-09-11 08:38:00|
......
schema
root
|-- rec_id: integer (nullable = true)
|-- rec_name: string (nullable = true)
|-- rec_value: decimal(3,1) (nullable = true)
|-- rec_created: timestamp (nullable = true)
reading from Hive
SELECT *
FROM tab_data;
+------------------+--------------------+---------------------+------------------------+--+
| tab_data.rec_id | tab_data.rec_name | tab_data.rec_value | tab_data.rec_created |
+------------------+--------------------+---------------------+------------------------+--+
| 10 | customer1 | 10 | 2016-09-07 08:38:00.0 |
| 20 | customer2 | 24 | 2016-09-08 10:45:00.0 |
| 30 | customer3 | 35 | 2016-09-10 03:26:00.0 |
| 40 | customer1 | 46 | 2016-09-11 08:38:00.0 |
.....
Hope this helps.
Upvotes: 1