bp2010
bp2010

Reputation: 2472

pyspark: arrays_zip equivalent in Spark 2.3

How to write the equivalent function of arrays_zip in Spark 2.3?

Source code from Spark 2.4

def arrays_zip(*cols):
    """
    Collection function: Returns a merged array of structs in which the N-th struct contains all
    N-th values of input arrays.

    :param cols: columns of arrays to be merged.

    >>> from pyspark.sql.functions import arrays_zip
    >>> df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
    >>> df.select(arrays_zip(df.vals1, df.vals2).alias('zipped')).collect()
    [Row(zipped=[Row(vals1=1, vals2=2), Row(vals1=2, vals2=3), Row(vals1=3, vals2=4)])]
    """
    sc = SparkContext._active_spark_context
    return Column(sc._jvm.functions.arrays_zip(_to_seq(sc, cols, _to_java_column)))

How to achieve similar in PySpark?

Upvotes: 7

Views: 5968

Answers (3)

Sajad Norouzi
Sajad Norouzi

Reputation: 1920

You can simply use f.array, but you have to get the values later by index not by column name (that's the only difference).

from pyspark.sql import functions as f

df = df.withColumn('combined', f.array(f.col('col1'), f.col('col2'), f.col('col3')))

Upvotes: 0

Shaido
Shaido

Reputation: 28392

You can use an UDF to obtain the same functionality as arrays_zip. Note that the column types need to be the same for this to work (in this case of IntegerType). If there are any differences in column types, convert the columns to a common type before using the UDF.

from pyspark.sql import functions as F
from pyspark.sql import types as T

def zip_func(*args):
    return list(zip(*args))

zip_udf = F.udf(zip_func, T.ArrayType(T.ArrayType(T.IntegerType())))

It can be used in the same way as arrays_zip, for example:

df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
df.select(zip_udf(df.vals1, df.vals2).alias('zipped')).collect()

Upvotes: 1

Shubham Jain
Shubham Jain

Reputation: 5536

You can achieve this by creating User Defined Function

import pyspark.sql.functions as f
import pyspark.sql.types as t

arrays_zip_ = f.udf(lambda x, y: list(zip(x, y)),  
      t.ArrayType(t.StructType([
          # Choose Datatype according to requirement
          t.StructField("first", t.IntegerType()),
          t.StructField("second", t.StringType())
  ])))

df = spark.createDataFrame([(([1, 2, 3], ['2', '3', '4']))], ['first', 'second'])

Now results with spark<=2.3

df.select(arrays_zip_('first', 'second').alias('zipped')).show(2,False)

+------------------------+
|zipped                  |
+------------------------+
|[[1, 2], [2, 3], [3, 4]]|
+------------------------+

And result with Spark version 2.4

df.select(f.arrays_zip('first', 'second').alias('zipped')).show(2,False)

+------------------------+
|zipped                  |
+------------------------+
|[[1, 2], [2, 3], [3, 4]]|
+------------------------+

Upvotes: 1

Related Questions