Reputation: 4035
How can a string column be split by comma into a new dataframe with applied schema?
As an example, here's a pyspark DataFrame with two columns (id
and value
)
df = sc.parallelize([(1, "200,201,hello"), (2, "23,24,hi")]).toDF(["id", "value"])
I want to get and split the value
column into a new DataFrame and apply the following schema:
from pyspark.sql.types import IntegerType, StringType, StructField, StructType
message_schema = StructType(
[
StructField("id", IntegerType()),
StructField("value", IntegerType()),
StructField("message", StringType()),
]
)
What would work is:
df_split = (
df.select(split(df.value, ",\s*"))
.rdd.flatMap(lambda x: x)
.toDF()
)
df_split.show()
But I would still need to cast and rename the columns based on the schema:
df_split.select(
[
col(_name).cast(_schema.dataType).alias(_schema.name)
for _name, _schema in zip(df_split.columns, message_schema)
]
).show()
with the expected result:
+---+-----+-------+
| id|value|message|
+---+-----+-------+
|200| 201| hello|
| 23| 24| hi|
+---+-----+-------+
Upvotes: 1
Views: 784
Reputation: 84
I have mentioned each and every steps in details so beginners can easily get it
#Importing Libs
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import*
#creating SparkSession
spark = SparkSession.builder.appName("stackoverflow").getOrCreate()
#data
data = [(1, "200,201,hello"), (2, "23,24,hi")]
#creating dataframe
df=spark.createDataFrame(data)
df=df.drop(df['_1'])
df.show()
+-------------+
| _2|
+-------------+
|200,201,hello|
| 23,24,hi|
+-------------+
df1=df.withColumn("id", split(df["_2"], ",").getItem(0))\
.withColumn("Value", split(df["_2"], ",").getItem(1))\
.withColumn("message", split(df["_2"], ",").getItem(2))
df1=df1.drop(df["_2"])
df1.show()
+---+-----+-------+
| id|Value|message|
+---+-----+-------+
|200| 201| hello|
| 23| 24| hi|
+---+-----+-------+
#casting as per defined schema
dff=df1.withColumn("id", df1['id'].cast(IntegerType())).withColumn("Value", df1['Value'].cast(IntegerType())).withColumn("message", df1['message'].cast(StringType()))
dff.show()
+---+-----+-------+
| id|Value|message|
+---+-----+-------+
|200| 201| hello|
| 23| 24| hi|
+---+-----+-------+
Upvotes: 0
Reputation: 32700
For Spark 3+, there a is function from_csv
that you can use to parse the comma delimited string using message_schema
schema in DDL format:
import pyspark.sql.functions as F
df1 = df.withColumn(
"message",
F.from_csv("value", message_schema.simpleString())
).select("message.*")
df1.show()
#+---+-----+-------+
#| id|value|message|
#+---+-----+-------+
#|200| 201| hello|
#| 23| 24| hi|
#+---+-----+-------+
df1.printSchema()
#root
# |-- id: integer (nullable = true)
# |-- value: integer (nullable = true)
# |-- message: string (nullable = true)
Upvotes: 0