Reputation: 69
There is this syntax: df.withColumn('new', regexp_replace('old', 'str', ''))
this is for replacing a string in a column.
My question is what if ii have a column consisting of arrays and string. Meaning a row could have either a string , or an array containing this string. Is there any way of replacing this string regardless of if it's alone or inside an array?
Upvotes: 0
Views: 9434
Reputation: 2752
Having a column with multiple types is not currently supported. However, the column contained an array of string, you could explode the array (https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=explode#pyspark.sql.functions.explode), which creates a row for each element in the array, and apply the regular expression to the new column. Example:
from pyspark import SQLContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()
sql_context = SQLContext(spark.sparkContext)
df = sql_context.createDataFrame([("hello world",),
("hello madam",),
("hello sir",),
("hello everybody",),
("goodbye world",)], schema=['test'])
df = df.withColumn('test', F.array(F.col('test')))
print(df.show())
df = df.withColumn('test-exploded', F.explode(F.col('test')))
df = df.withColumn('test-exploded-regex', F.regexp_replace(F.col('test-exploded'), "hello", "goodbye"))
print(df.show())
Output:
+-----------------+
| test|
+-----------------+
| [hello world]|
| [hello madam]|
| [hello sir]|
|[hello everybody]|
| [goodbye world]|
+-----------------+
+-----------------+---------------+-------------------+
| test| test-exploded|test-exploded-regex|
+-----------------+---------------+-------------------+
| [hello world]| hello world| goodbye world|
| [hello madam]| hello madam| goodbye madam|
| [hello sir]| hello sir| goodbye sir|
|[hello everybody]|hello everybody| goodbye everybody|
| [goodbye world]| goodbye world| goodbye world|
+-----------------+---------------+-------------------+
And if you wanted to put the results back in an array:
df = df.withColumn('test-exploded-regex-array', F.array(F.col('test-exploded-regex')))
Output:
+-----------------+---------------+-------------------+-------------------------+
| test| test-exploded|test-exploded-regex|test-exploded-regex-array|
+-----------------+---------------+-------------------+-------------------------+
| [hello world]| hello world| goodbye world| [goodbye world]|
| [hello madam]| hello madam| goodbye madam| [goodbye madam]|
| [hello sir]| hello sir| goodbye sir| [goodbye sir]|
|[hello everybody]|hello everybody| goodbye everybody| [goodbye everybody]|
| [goodbye world]| goodbye world| goodbye world| [goodbye world]|
+-----------------+---------------+-------------------+-------------------------+
Hope this helps!
Updated to include case where the array column has several strings:
from pyspark import SQLContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()
sql_context = SQLContext(spark.sparkContext)
df = sql_context.createDataFrame([("hello world", "foo"),
("hello madam", "bar"),
("hello sir", "baz"),
("hello everybody", "boo"),
("goodbye world", "bah")], schema=['test', 'test2'])
df = df.withColumn('test', F.array(F.col('test'), F.col('test2'))).drop('test2')
df = df.withColumn('id', F.monotonically_increasing_id())
print(df.show())
df = df.withColumn('test-exploded', F.explode(F.col('test')))
df = df.withColumn('test-exploded-regex', F.regexp_replace(F.col('test-exploded'), "hello", "goodbye"))
df = df.groupBy('id').agg(F.collect_list(F.col('test-exploded-regex')).alias('test-exploded-regex-array'))
print(df.show())
Output:
+--------------------+-----------+
| test| id|
+--------------------+-----------+
| [hello world, foo]| 0|
| [hello madam, bar]| 8589934592|
| [hello sir, baz]|17179869184|
|[hello everybody,...|25769803776|
|[goodbye world, bah]|25769803777|
+--------------------+-----------+
+-----------+-------------------------+
| id|test-exploded-regex-array|
+-----------+-------------------------+
| 8589934592| [goodbye madam, bar]|
| 0| [goodbye world, foo]|
|25769803776| [goodbye everybod...|
|25769803777| [goodbye world, bah]|
|17179869184| [goodbye sir, baz]|
+-----------+-------------------------+
Just drop the id
column when you're finished processing!
Upvotes: 2
Reputation: 1586
I think it is not possible in a dataframe in spark since the dataframe does not allow having multiple types for a column. It will give error while making the dataframe.
Though, you can do it using RDD's.
scala> val seq = Seq((1,"abc"),(2,List("abcd")))
seq: Seq[(Int, java.io.Serializable)] = List((1,abc), (2,List(abcd)))
scala> val rdd1 = sc.parallelize(seq)
rdd1: org.apache.spark.rdd.RDD[(Int, java.io.Serializable)] = ParallelCollectionRDD[2] at parallelize at <console>:26
scala> rdd1.take(2)
res1: Array[(Int, java.io.Serializable)] = Array((1,abc), (2,List(abcd)))
scala> val rdd2 = rdd1.map(x => x._2 match {
| case v: String => (x._1, v.replaceAll("abc","def"))
| case p: List[String] => (x._1, p.map(s => s.replaceAll("abc","def")))
| }
| )
rdd2: org.apache.spark.rdd.RDD[(Int, java.io.Serializable)] = MapPartitionsRDD[3] at map at <console>:25
scala> rdd2.take(2)
res2: Array[(Int, java.io.Serializable)] = Array((1,def), (2,List(defd)))
Let me know if it helps!!
Upvotes: 0