rose1110
rose1110

Reputation: 59

How to change column Data type dynamically in pyspark

I have the fixed file that need to split based on the position and the datatype same as SchemaFile. How to change the Datatype ? I can Cast each column but my requirement to convert dynamically using pyspark

** TextFile **

"00120181120xyz1234"
"00220180203abc56792"
"00320181203pqr25483"

** Schema File **

{"Column":"id","From":"1","To":"3","dtype":"int"}      
{"Column":"date","From":"4","To":"8","dtype":"date"}    
{"Column":"name","From":"12","To":"3","dtype":"String"}    
{"Column":"salary","From":"15","To":"5","dtype":"int"}
 
datafile=spark.read.csv("text.dat")  
SchemaFile=spark.read.json("text.json")  
sfDict = map(lambda x: x.asDict(), SchemaFile.collect())  

**finaldf splited based on position **

finaldf = inputfiledf.select(*[substring(str='value', 
pos=int(row['from']),len=int(row['to'])).alias(row['column']) for row in sfDict])
finaldf.printSchema()
root |-- id: string (nullable = true)      
|-- date: string (nullable = true)    
|-- name: string (nullable = true)  
|-- salary: string (nullable = true)   

Expecting Date datatype as Date and Salary datatype as Int.Can we do this dynamically?

Upvotes: 1

Views: 1471

Answers (1)

Riley Schack
Riley Schack

Reputation: 86

You almost had the solution. You just need to add .cast() inside of your list comprehension:

finaldf = inputfiledf.select(
    *[
        substring(str="value", pos=int(row["from"]), len=int(row["to"]))
        .alias(row["column"])
        .cast(row["dtype"])
        for row in sfDict
    ]
)

It may also be better to replace sfDict with the following:

def schema():
    with open("text.json", "r") as f:
        for line in f:
            yield json.loads(line)


sfDict = schema()

Upvotes: 1

Related Questions