Joost Döbken
Joost Döbken

Reputation: 4035

PySpark split column to new dataframe with applied schema

How can a string column be split by comma into a new dataframe with applied schema?

As an example, here's a pyspark DataFrame with two columns (id and value)

df = sc.parallelize([(1, "200,201,hello"), (2, "23,24,hi")]).toDF(["id", "value"])

I want to get and split the value column into a new DataFrame and apply the following schema:

from pyspark.sql.types import IntegerType, StringType, StructField, StructType

message_schema = StructType(
    [
        StructField("id", IntegerType()),
        StructField("value", IntegerType()),
        StructField("message", StringType()),
    ]
)

What would work is:

df_split = (
    df.select(split(df.value, ",\s*"))
    .rdd.flatMap(lambda x: x)
    .toDF()
)
df_split.show()

But I would still need to cast and rename the columns based on the schema:

df_split.select(
    [
        col(_name).cast(_schema.dataType).alias(_schema.name)
        for _name, _schema in zip(df_split.columns, message_schema)
    ]
).show()

with the expected result:

+---+-----+-------+
| id|value|message|
+---+-----+-------+
|200|  201|  hello|
| 23|   24|     hi|
+---+-----+-------+

Upvotes: 1

Views: 784

Answers (2)

mr.data_engg
mr.data_engg

Reputation: 84

I have mentioned each and every steps in details so beginners can easily get it

#Importing Libs
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import*

#creating SparkSession
spark = SparkSession.builder.appName("stackoverflow").getOrCreate()

#data 
data = [(1, "200,201,hello"), (2, "23,24,hi")]

#creating dataframe
df=spark.createDataFrame(data)

df=df.drop(df['_1'])
df.show()
+-------------+
|           _2|
+-------------+
|200,201,hello|
|     23,24,hi|
+-------------+
df1=df.withColumn("id", split(df["_2"], ",").getItem(0))\
.withColumn("Value", split(df["_2"], ",").getItem(1))\
.withColumn("message", split(df["_2"], ",").getItem(2))

df1=df1.drop(df["_2"])
df1.show()

+---+-----+-------+
| id|Value|message|
+---+-----+-------+
|200|  201|  hello|
| 23|   24|     hi|
+---+-----+-------+

#casting as per defined schema
dff=df1.withColumn("id", df1['id'].cast(IntegerType())).withColumn("Value", df1['Value'].cast(IntegerType())).withColumn("message", df1['message'].cast(StringType()))                         
dff.show()
+---+-----+-------+
| id|Value|message|
+---+-----+-------+
|200|  201|  hello|
| 23|   24|     hi|
+---+-----+-------+

Upvotes: 0

blackbishop
blackbishop

Reputation: 32700

For Spark 3+, there a is function from_csv that you can use to parse the comma delimited string using message_schema schema in DDL format:

import pyspark.sql.functions as F

df1 = df.withColumn(
    "message",
    F.from_csv("value", message_schema.simpleString())
).select("message.*")

df1.show()
#+---+-----+-------+
#| id|value|message|
#+---+-----+-------+
#|200|  201|  hello|
#| 23|   24|     hi|
#+---+-----+-------+

df1.printSchema()
#root
# |-- id: integer (nullable = true)
# |-- value: integer (nullable = true)
# |-- message: string (nullable = true)

Upvotes: 0

Related Questions