Reputation: 59
I have the fixed file that need to split based on the position and the datatype same as SchemaFile. How to change the Datatype ? I can Cast each column but my requirement to convert dynamically using pyspark
** TextFile **
"00120181120xyz1234"
"00220180203abc56792"
"00320181203pqr25483"
** Schema File **
{"Column":"id","From":"1","To":"3","dtype":"int"}
{"Column":"date","From":"4","To":"8","dtype":"date"}
{"Column":"name","From":"12","To":"3","dtype":"String"}
{"Column":"salary","From":"15","To":"5","dtype":"int"}
datafile=spark.read.csv("text.dat")
SchemaFile=spark.read.json("text.json")
sfDict = map(lambda x: x.asDict(), SchemaFile.collect())
**finaldf splited based on position **
finaldf = inputfiledf.select(*[substring(str='value',
pos=int(row['from']),len=int(row['to'])).alias(row['column']) for row in sfDict])
finaldf.printSchema()
root |-- id: string (nullable = true)
|-- date: string (nullable = true)
|-- name: string (nullable = true)
|-- salary: string (nullable = true)
Expecting Date datatype as Date and Salary datatype as Int.Can we do this dynamically?
Upvotes: 1
Views: 1471
Reputation: 86
You almost had the solution. You just need to add .cast()
inside of your list comprehension:
finaldf = inputfiledf.select(
*[
substring(str="value", pos=int(row["from"]), len=int(row["to"]))
.alias(row["column"])
.cast(row["dtype"])
for row in sfDict
]
)
It may also be better to replace sfDict
with the following:
def schema():
with open("text.json", "r") as f:
for line in f:
yield json.loads(line)
sfDict = schema()
Upvotes: 1