Azo
Azo

Reputation: 67

Spark scala Dataframe : How can i apply custom type to an existing dataframe?

I have a dataframe (dataDF) which contains data like :

firstColumn;secondColumn;thirdColumn
myText;123;2010-08-12 00:00:00

In my case, all of these columns are StringType.

In the other hand, I have another DataFrame (customTypeDF) which can be modified and contains for some columns a custom type like :

columnName;customType
secondColumn;IntegerType
thirdColumn; TimestampType

How can I apply dynamically the new types on my dataDF dataframe ?

Upvotes: 0

Views: 426

Answers (2)

in my case works using a createTableOptions, but i can not put "varchar" as type else "varchar(length)" en mi case for insert into postgres db.

My conflict was with spark string to pass a varchar

Spark doc

custom_schema ="""
anio_fabricacion int,
anio_nacimiento varchar(4),
avaluo_habitacional integer"""

(sdf.write 
        .mode('overwrite') 
        .format("jdbc") 
        .option("url", conf['fullUrl']) 
        .option("dbtable", dbtable) 
        .option("user", conf['user']) 
        .option("password", conf['password']) 
        .option("driver", "org.postgresql.Driver")
        .option("createTableColumnTypes", custom_schema) 
        .save())

Upvotes: 0

mck
mck

Reputation: 42422

You can map the column names using the customTypeDF collected as a Seq:

val colTypes = customTypeDF.rdd.map(x => x.toSeq.asInstanceOf[Seq[String]]).collect

val result = dataDF.select(
    dataDF.columns.map(c => 
        if (colTypes.map(_(0)).contains(c)) 
        col(c).cast(colTypes.filter(_(0) == c)(0)(1).toLowerCase.replace("type","")).as(c) 
        else col(c)
    ):_*
)

result.show
+-----------+------------+-------------------+
|firstColumn|secondColumn|        thirdColumn|
+-----------+------------+-------------------+
|     myText|         123|2010-08-12 00:00:00|
+-----------+------------+-------------------+

result.printSchema
root
 |-- firstColumn: string (nullable = true)
 |-- secondColumn: integer (nullable = true)
 |-- thirdColumn: timestamp (nullable = true)

Upvotes: 1

Related Questions