Reputation: 2085
I am trying to ingest an RDBMS table into Hive. I have obtained the dataframe in the following way:
val yearDF = spark.read.format("jdbc").option("url", connectionUrl)
.option("dbtable", "(select * from schema.tablename where source_system_name='DB2' and period_year='2017') as year2017")
.option("user", devUserName)
.option("password", devPassword)
.option("numPartitions",15)
.load()
These are the columns of the dataframe:
geography:string|
project:string|
reference_code:string
product_line:string
book_type:string
cc_region:string
cc_channel:string
cc_function:string
pl_market:string
ptd_balance:double
qtd_balance:double
ytd_balance:double
xx_last_update_tms:timestamp
xx_last_update_log_id:int
xx_data_hash_code:string
xx_data_hash_id:bigint
The columns ptd_balance, qtd_balance, ytd_balance
are double datatypes which are precision columns. Our project wants to convert their datatype from Double to String by creating new columns: ptd_balance_text, qtd_balance_text, ytd_balance_text
with same data inorder to avoid any data truncation.
withColumn
will create a new column in the dataframe.
withColumnRenamed
will rename the existing column.
The dataframe has nearly 10 million records. Is there an effective way to create multiple new columns with same data and different type from the existing columns in a dataframe ?
Upvotes: 0
Views: 167
Reputation: 1053
If i was in your shoes, i would make changes in the extraction query or ask BI team to put some effort :P for adding and casting the fields on the fly while extracting, but any how what you are asking is possible.
You can add the columns from the existing columns as below. Check the addColsTosampleDF
dataframe
. I hope the comments below will be enough to understand, if you have any questions feel free to add in the comments and i will edit my answer.
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
scala> val ss = SparkSession.builder().appName("TEST").getOrCreate()
18/08/07 15:51:42 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
ss: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@6de4071b
//Sample dataframe with int, double and string fields
scala> val sampleDf = Seq((100, 1.0, "row1"),(1,10.12,"col_float")).toDF("col1", "col2", "col3")
sampleDf: org.apache.spark.sql.DataFrame = [col1: int, col2: double ... 1 more field]
scala> sampleDf.printSchema
root
|-- col1: integer (nullable = false)
|-- col2: double (nullable = false)
|-- col3: string (nullable = true)
//Adding columns col1_string from col1 and col2_doubletostring from col2 with casting and alias
scala> val addColsTosampleDF = sampleDf.
select(sampleDf.col("col1"),
sampleDf.col("col2"),
sampleDf.col("col3"),
sampleDf.col("col1").cast("string").alias("col1_string"),
sampleDf.col("col2").cast("string").alias("col2_doubletostring"))
addColsTosampleDF: org.apache.spark.sql.DataFrame = [col1: int, col2: double ... 3 more fields]
//Schema with added columns
scala> addColsTosampleDF.printSchema
root
|-- col1: integer (nullable = false)
|-- col2: double (nullable = false)
|-- col3: string (nullable = true)
|-- col1_string: string (nullable = false)
|-- col2_doubletostring: string (nullable = false)
scala> addColsTosampleDF.show()
+----+-----+---------+-----------+-------------------+
|col1| col2| col3|col1_string|col2_doubletostring|
+----+-----+---------+-----------+-------------------+
| 100| 1.0| row1| 100| 1.0|
| 1|10.12|col_float| 1| 10.12|
+----+-----+---------+-----------+-------------------+
Upvotes: 1
Reputation: 1892
You can do this creating query
from all columns
like below
import org.apache.spark.sql.types.StringType
//Input:
scala> df.show
+----+-----+--------+--------+
| id| name| salary| bonus|
+----+-----+--------+--------+
|1001|Alice| 8000.25|1233.385|
|1002| Bob|7526.365| 1856.69|
+----+-----+--------+--------+
scala> df.printSchema
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- salary: double (nullable = false)
|-- bonus: double (nullable = false)
//solution approach:
val query=df.columns.toList.map(cl=>if(cl=="salary" || cl=="bonus") col(cl).cast(StringType).as(cl+"_text") else col(cl))
//Output:
scala> df.select(query:_*).printSchema
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- salary_text: string (nullable = false)
|-- bonus_text: string (nullable = false)
scala> df.select(query:_*).show
+----+-----+-----------+----------+
| id| name|salary_text|bonus_text|
+----+-----+-----------+----------+
|1001|Alice| 8000.25| 1233.385|
|1002| Bob| 7526.365| 1856.69|
+----+-----+-----------+----------+
Upvotes: 1