Reputation: 67
I have a dataframe (dataDF) which contains data like :
firstColumn;secondColumn;thirdColumn
myText;123;2010-08-12 00:00:00
In my case, all of these columns are StringType.
In the other hand, I have another DataFrame (customTypeDF) which can be modified and contains for some columns a custom type like :
columnName;customType
secondColumn;IntegerType
thirdColumn; TimestampType
How can I apply dynamically the new types on my dataDF dataframe ?
Upvotes: 0
Views: 426
Reputation: 712
in my case works using a createTableOptions, but i can not put "varchar" as type else "varchar(length)" en mi case for insert into postgres db.
My conflict was with spark string to pass a varchar
custom_schema ="""
anio_fabricacion int,
anio_nacimiento varchar(4),
avaluo_habitacional integer"""
(sdf.write
.mode('overwrite')
.format("jdbc")
.option("url", conf['fullUrl'])
.option("dbtable", dbtable)
.option("user", conf['user'])
.option("password", conf['password'])
.option("driver", "org.postgresql.Driver")
.option("createTableColumnTypes", custom_schema)
.save())
Upvotes: 0
Reputation: 42422
You can map the column names using the customTypeDF collected as a Seq:
val colTypes = customTypeDF.rdd.map(x => x.toSeq.asInstanceOf[Seq[String]]).collect
val result = dataDF.select(
dataDF.columns.map(c =>
if (colTypes.map(_(0)).contains(c))
col(c).cast(colTypes.filter(_(0) == c)(0)(1).toLowerCase.replace("type","")).as(c)
else col(c)
):_*
)
result.show
+-----------+------------+-------------------+
|firstColumn|secondColumn| thirdColumn|
+-----------+------------+-------------------+
| myText| 123|2010-08-12 00:00:00|
+-----------+------------+-------------------+
result.printSchema
root
|-- firstColumn: string (nullable = true)
|-- secondColumn: integer (nullable = true)
|-- thirdColumn: timestamp (nullable = true)
Upvotes: 1