David Griffin
David Griffin

Reputation: 13927

DataFrame-ified zipWithIndex

I am trying to solve the age-old problem of adding a sequence number to a data set. I am working with DataFrames, and there appears to be no DataFrame equivalent to RDD.zipWithIndex. On the other hand, the following works more or less the way I want it to:

val origDF = sqlContext.load(...)    

val seqDF= sqlContext.createDataFrame(
    origDF.rdd.zipWithIndex.map(ln => Row.fromSeq(Seq(ln._2) ++ ln._1.toSeq)),
    StructType(Array(StructField("seq", LongType, false)) ++ origDF.schema.fields)
)

In my actual application, origDF won't be loaded directly out of a file -- it is going to be created by joining 2-3 other DataFrames together and will contain upwards of 100 million rows.

Is there a better way to do this? What can I do to optimize it?

Upvotes: 42

Views: 29769

Answers (9)

canberker
canberker

Reputation: 46

Spark Java API version:

I have implemented @Evgeny's solution for performing zipWithIndex on DataFrames in Java and wanted to share the code.

It also contains the improvements offered by @fylb in his solution. I can confirm for Spark 2.4 that the execution fails when the entries returned by spark_partition_id() do not start with 0 or do not increase sequentially. As this function is documented to be non-deterministic, it is very likely that one of the above cases will occur. One example is triggered by increasing the partition count.

The Java implementation:

public static Dataset<Row> zipWithIndex(Dataset<Row> df, Long offset, String indexName) {
        Dataset<Row> dfWithPartitionId = df
                .withColumn("partition_id", spark_partition_id())
                .withColumn("inc_id", monotonically_increasing_id());

        Object partitionOffsetsObject = dfWithPartitionId
                .groupBy("partition_id")
                .agg(count(lit(1)).alias("cnt"), first("inc_id").alias("inc_id"))
                .orderBy("partition_id")
                .select(col("partition_id"), sum("cnt").over(Window.orderBy("partition_id")).minus(col("cnt")).minus(col("inc_id")).plus(lit(offset).alias("cnt")))
                .collect();
        Row[] partitionOffsetsArray = ((Row[]) partitionOffsetsObject);
        Map<Integer, Long> partitionOffsets = new HashMap<>();
        for (int i = 0; i < partitionOffsetsArray.length; i++) {
            partitionOffsets.put(partitionOffsetsArray[i].getInt(0), partitionOffsetsArray[i].getLong(1));
        }

        UserDefinedFunction getPartitionOffset = udf(
                (partitionId) -> partitionOffsets.get((Integer) partitionId), DataTypes.LongType
        );

        return dfWithPartitionId
                .withColumn("partition_offset", getPartitionOffset.apply(col("partition_id")))
                .withColumn(indexName, col("partition_offset").plus(col("inc_id")))
                .drop("partition_id", "partition_offset", "inc_id");
    }

Upvotes: 2

Paladin
Paladin

Reputation: 630

I have modified @Tagar's version to run on Python 3.7, wanted to share:

def dfZipWithIndex (df, offset=1, colName="rowId"):
'''
    Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe
    and preserves a schema

    :param df: source dataframe
    :param offset: adjustment to zipWithIndex()'s index
    :param colName: name of the index column
'''

new_schema = StructType(
                [StructField(colName,LongType(),True)]        # new added field in front
                + df.schema.fields                            # previous schema
            )

zipped_rdd = df.rdd.zipWithIndex()

new_rdd = zipped_rdd.map(lambda args: ([args[1] + offset] + list(args[0])))      # use this for python 3+, tuple gets passed as single argument so using args and [] notation to read elements within args
return spark.createDataFrame(new_rdd, new_schema)

Upvotes: 3

Tagar
Tagar

Reputation: 14939

PySpark version:

from pyspark.sql.types import LongType, StructField, StructType

def dfZipWithIndex (df, offset=1, colName="rowId"):
    '''
        Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe 
        and preserves a schema
        
        :param df: source dataframe
        :param offset: adjustment to zipWithIndex()'s index
        :param colName: name of the index column
    '''
    
    new_schema = StructType(
                    [StructField(colName,LongType(),True)]        # new added field in front
                    + df.schema.fields                            # previous schema
                )
    
    zipped_rdd = df.rdd.zipWithIndex()
    
    new_rdd = zipped_rdd.map(lambda (row,rowId): ([rowId +offset] + list(row)))

    return spark.createDataFrame(new_rdd, new_schema)

Also created a Jira to add this functionality in Spark natively: https://issues.apache.org/jira/browse/SPARK-23074

Upvotes: 8

Afik Bar
Afik Bar

Reputation: 11

I've ported @canberker suggestion to Python 3 (pyspark).

Also, instead of using UDF with hash map, I used broadcast join which slightly improved performance during my testings.

NOTE: This solution still suffers from gaps due to empty partitions.

def zip_with_index(df, offset: int = 1, index_name: str = "id"):
    df_with_partition_id = (
        df
        .withColumn("partition_id", F.spark_partition_id())
        .withColumn("inc_id", F.monotonically_increasing_id())
    )
    partition_offsets_df = (
        df_with_partition_id
        .groupBy("partition_id")
        .agg(F.count(F.lit(1)).alias("cnt"), F.first("inc_id").alias("inc_id"))
        .orderBy("partition_id")
        .select(
            F.col("partition_id"),
            (
                F.sum("cnt").over(Window.orderBy("partition_id"))
                - F.col("cnt") - F.col("inc_id") + F.lit(offset)
            ).alias("partition_offset")
        )
    )

    res = (
        df_with_partition_id
        .join(partition_offsets_df.hint("broadcast"), on="partition_id")
        .withColumn(index_name, F.col("partition_offset") + F.col("inc_id"))
        .drop("partition_id", "partition_offset", "inc_id")
    )
    return res

Upvotes: 1

ebonnal
ebonnal

Reputation: 1167

Here is my proposal, the advantages of which are:

  • It does not involve any serialization/deserialization[1] of our DataFrame's InternalRows.
  • Its logic is minimalist by relying only on RDD.zipWithIndex.

Its major down sides are:

  • It is impossible to use it directly from non-JVM APIs (pySpark, SparkR).
  • It has to be under the package org.apache.spark.sql;.

imports:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.functions.lit
/**
  * Optimized Spark SQL equivalent of RDD.zipWithIndex.
  *
  * @param df
  * @param indexColName
  * @return `df` with a column named `indexColName` of consecutive unique ids.
  */
def zipWithIndex(df: DataFrame, indexColName: String = "index"): DataFrame = {
  import df.sparkSession.implicits._

  val dfWithIndexCol: DataFrame = df
    .drop(indexColName)
    .select(lit(0L).as(indexColName), $"*")

  val internalRows: RDD[InternalRow] = dfWithIndexCol
    .queryExecution
    .toRdd
    .zipWithIndex()
    .map {
      case (internalRow: InternalRow, index: Long) =>
        internalRow.setLong(0, index)
        internalRow
    }

  Dataset.ofRows(
    df.sparkSession,
    LogicalRDD(dfWithIndexCol.schema.toAttributes, internalRows)(df.sparkSession)
  )


[1]: (from/to InternalRow's underlying bytes array <--> GenericRow's underlying JVM objects collection Array[Any]).

Upvotes: 1

fylb
fylb

Reputation: 699

@Evgeny , your solution is interesting. Notice that there is a bug when you have empty partitions (the array is missing these partition indexes, at least this is happening to me with spark 1.6), so I converted the array into a Map(partitionId -> offsets).

Additionnally, I took out the sources of monotonically_increasing_id to have "inc_id" starting from 0 in each partition.

Here is an updated version:

import org.apache.spark.sql.catalyst.expressions.LeafExpression
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.catalyst.expressions.Nondeterministic
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratedExpressionCode
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenContext
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
import org.apache.spark.sql.expressions.Window

case class PartitionMonotonicallyIncreasingID() extends LeafExpression with Nondeterministic {

  /**
   * From org.apache.spark.sql.catalyst.expressions.MonotonicallyIncreasingID
   *
   * Record ID within each partition. By being transient, count's value is reset to 0 every time
   * we serialize and deserialize and initialize it.
   */
  @transient private[this] var count: Long = _

  override protected def initInternal(): Unit = {
    count = 1L // notice this starts at 1, not 0 as in org.apache.spark.sql.catalyst.expressions.MonotonicallyIncreasingID
  }

  override def nullable: Boolean = false

  override def dataType: DataType = LongType

  override protected def evalInternal(input: InternalRow): Long = {
    val currentCount = count
    count += 1
    currentCount
  }

  override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
    val countTerm = ctx.freshName("count")
    ctx.addMutableState(ctx.JAVA_LONG, countTerm, s"$countTerm = 1L;")
    ev.isNull = "false"
    s"""
      final ${ctx.javaType(dataType)} ${ev.value} = $countTerm;
      $countTerm++;
    """
  }
}

object DataframeUtils {
  def zipWithIndex(df: DataFrame, offset: Long = 0, indexName: String = "index") = {
    // from https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex)
    val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", new Column(PartitionMonotonicallyIncreasingID()))

    // collect each partition size, create the offset pages
    val partitionOffsets: Map[Int, Long] = dfWithPartitionId
      .groupBy("partition_id")
      .agg(max("inc_id") as "cnt") // in each partition, count(inc_id) is equal to max(inc_id) (I don't know which one would be faster)
      .select(col("partition_id"), sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") + lit(offset) as "cnt")
      .collect()
      .map(r => (r.getInt(0) -> r.getLong(1)))
      .toMap

    def partition_offset(partitionId: Int): Long = partitionOffsets(partitionId)
    val partition_offset_udf = udf(partition_offset _)
    // and re-number the index
    dfWithPartitionId
      .withColumn("partition_offset", partition_offset_udf(col("partition_id")))
      .withColumn(indexName, col("partition_offset") + col("inc_id"))
      .drop("partition_id")
      .drop("partition_offset")
      .drop("inc_id")
  }
}

Upvotes: 5

Evgenii Glotov
Evgenii Glotov

Reputation: 176

Since Spark 1.6 there is a function called monotonically_increasing_id()
It generates a new column with unique 64-bit monotonic index for each row
But it isn't consequential, each partition starts a new range, so we must calculate each partition offset before using it.
Trying to provide an "rdd-free" solution, I ended up with some collect(), but it only collects offsets, one value per partition, so it will not cause OOM

def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") = {
    val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", monotonically_increasing_id())

    val partitionOffsets = dfWithPartitionId
        .groupBy("partition_id")
        .agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id")
        .orderBy("partition_id")
        .select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - col("inc_id") + lit(offset) as "cnt" )
        .collect()
        .map(_.getLong(0))
        .toArray
        
     dfWithPartitionId
        .withColumn("partition_offset", udf((partitionId: Int) => partitionOffsets(partitionId), LongType)(col("partition_id")))
        .withColumn(indexName, col("partition_offset") + col("inc_id"))
        .drop("partition_id", "partition_offset", "inc_id")
}

This solution doesn't repack the original rows and doesn't repartition the original huge dataframe, so it is quite fast in real world: 200GB of CSV data (43 million rows with 150 columns) read, indexed and packed to parquet in 2 minutes on 240 cores
After testing my solution, I have run Kirk Broadhurst's solution and it was 20 seconds slower
You may want or not want to use dfWithPartitionId.cache(), depends on task

Upvotes: 14

Kirk Broadhurst
Kirk Broadhurst

Reputation: 28738

The following was posted on behalf of the David Griffin (edited out of question).

The all-singing, all-dancing dfZipWithIndex method. You can set the starting offset (which defaults to 1), the index column name (defaults to "id"), and place the column in the front or the back:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.Row


def dfZipWithIndex(
  df: DataFrame,
  offset: Int = 1,
  colName: String = "id",
  inFront: Boolean = true
) : DataFrame = {
  df.sqlContext.createDataFrame(
    df.rdd.zipWithIndex.map(ln =>
      Row.fromSeq(
        (if (inFront) Seq(ln._2 + offset) else Seq())
          ++ ln._1.toSeq ++
        (if (inFront) Seq() else Seq(ln._2 + offset))
      )
    ),
    StructType(
      (if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]()) 
        ++ df.schema.fields ++ 
      (if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false)))
    )
  ) 
}

Upvotes: 40

David Griffin
David Griffin

Reputation: 13927

Starting in Spark 1.5, Window expressions were added to Spark. Instead of having to convert the DataFrame to an RDD, you can now use org.apache.spark.sql.expressions.row_number. Note that I found performance for the the above dfZipWithIndex to be significantly faster than the below algorithm. But I am posting it because:

  1. Someone else is going to be tempted to try this
  2. Maybe someone can optimize the expressions below

At any rate, here's what works for me:

import org.apache.spark.sql.expressions._

df.withColumn("row_num", row_number.over(Window.partitionBy(lit(1)).orderBy(lit(1))))

Note that I use lit(1) for both the partitioning and the ordering -- this makes everything be in the same partition, and seems to preserve the original ordering of the DataFrame, but I suppose it is what slows it way down.

I tested it on a 4-column DataFrame with 7,000,000 rows and the speed difference is significant between this and the above dfZipWithIndex (like I said, the RDD functions is much, much faster).

Upvotes: 9

Related Questions