Liangpi
Liangpi

Reputation: 607

Spark Dataframe :How to add a index Column : Aka Distributed Data Index

I read data from a csv file ,but don't have index.

I want to add a column from 1 to row's number.

What should I do,Thanks (scala)

Upvotes: 56

Views: 162797

Answers (10)

kartik lohate
kartik lohate

Reputation: 1

With the help of udf function it takes less time at comparison to row_number or others.

   val df2 = df.withColumn("swampIdx",lit(1))
        var tem = 0;
        val incIdx = (i:Int) =>{
          tem += i;
          tem;
        }
        val udfFun = udf(incIdx)
        df2.withColumn("newIdx",udfFun(col("swampIdx"))).drop("swampIdx")

Upvotes: 0

Ramineni Ravi Teja
Ramineni Ravi Teja

Reputation: 3906

The monotonically_increasing_id() function generates monotonically increasing 64-bit integers. The generated id numbers are guaranteed to be increasing and unique, but they are not guaranteed to be consecutive.

from pyspark.sql.functions import *

df_with_increasing_id = df.withColumn("monotonically_increasing_id", monotonically_increasing_id())
df_with_increasing_id.show()

we get the following results:

+-----+---+---------------------------+
| Name|Age|monotonically_increasing_id|
+-----+---+---------------------------+
|Alice| 10|                 8589934592|
|Susan| 12|                25769803776|
+-----+---+---------------------------+

The row_number() function generates numbers that are consecutive.

Combine this with monotonically_increasing_id() to generate two columns of numbers that can be used to identify data entries.

from pyspark.sql.functions import *
from pyspark.sql.window import *

window = Window.orderBy(col('monotonically_increasing_id'))
df_with_consecutive_increasing_id = df_with_increasing_id.withColumn('increasing_id', row_number().over(window))
df_with_consecutive_increasing_id.show()

we get the following results:

+-----+---+---------------------------+-------------+
| Name|Age|monotonically_increasing_id|increasing_id|
+-----+---+---------------------------+-------------+
|Alice| 10|                 8589934592|            1|
|Susan| 12|                25769803776|            2|
+-----+---+---------------------------+-------------+

Upvotes: 0

anonymous
anonymous

Reputation: 1

Don't use this monotonically_increasing_id() it will become nightmare for you, data skewness will happens. Happened for me.

Upvotes: 0

nate
nate

Reputation: 1244

For SparkR:

(Assuming sdf is some sort of spark data frame)

sdf<- withColumn(sdf, "row_id", SparkR:::monotonically_increasing_id())

Upvotes: -1

Sequinex
Sequinex

Reputation: 671

How to get a sequential id column id[1, 2, 3, 4...n]:

from pyspark.sql.functions import desc, row_number, monotonically_increasing_id
from pyspark.sql.window import Window

df_with_seq_id = df.withColumn('index_column_name', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)

Note that row_number() starts at 1, therefore subtract by 1 if you want 0-indexed column

Upvotes: 26

Abhi
Abhi

Reputation: 6568

If you require a unique sequence number for each row, I have a slightly different approach, where a static column is added and is used to compute the row number using that column.

val srcData = spark.read.option("header","true").csv("/FileStore/sample.csv")
srcData.show(5)

+--------+--------------------+
|     Job|                Name|
+--------+--------------------+
|Morpheus|       HR Specialist|
|   Kayla|              Lawyer|
|  Trisha|          Bus Driver|
|  Robert|Elementary School...|
|    Ober|               Judge|
+--------+--------------------+

val srcDataModf = srcData.withColumn("sl_no",lit("1"))
val windowSpecRowNum =  Window.partitionBy("sl_no").orderBy("sl_no")

srcDataModf.withColumn("row_num",row_number.over(windowSpecRowNum)).drop("sl_no").select("row_num","Name","Job")show(5)

+-------+--------------------+--------+
|row_num|                Name|     Job|
+-------+--------------------+--------+
|      1|       HR Specialist|Morpheus|
|      2|              Lawyer|   Kayla|
|      3|          Bus Driver|  Trisha|
|      4|Elementary School...|  Robert|
|      5|               Judge|    Ober|
+-------+--------------------+--------+

Upvotes: 0

Ram Ghadiyaram
Ram Ghadiyaram

Reputation: 29165

NOTE : Above approaches doesn't give a sequence number, but it does give increasing id.

Simple way to do that and ensure the order of indexes is like below.. zipWithIndex.

Sample data.

+-------------------+
|               Name|
+-------------------+
|     Ram Ghadiyaram|
|        Ravichandra|
|              ilker|
|               nick|
|             Naveed|
|      Gobinathan SP|
|Sreenivas Venigalla|
|     Jackela Kowski|
|   Arindam Sengupta|
|            Liangpi|
|             Omar14|
|        anshu kumar|
+-------------------+

    package com.example

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row}

/**
  * DistributedDataIndex : Program to index an RDD  with
  */
object DistributedDataIndex extends App with Logging {

  val spark = builder
    .master("local[*]")
    .appName(this.getClass.getName)
    .getOrCreate()

  import spark.implicits._

  val df = spark.sparkContext.parallelize(
    Seq("Ram Ghadiyaram", "Ravichandra", "ilker", "nick"
      , "Naveed", "Gobinathan SP", "Sreenivas Venigalla", "Jackela Kowski", "Arindam Sengupta", "Liangpi", "Omar14", "anshu kumar"
    )).toDF("Name")
  df.show
  logInfo("addColumnIndex here")
  // Add index now...
  val df1WithIndex = addColumnIndex(df)
    .withColumn("monotonically_increasing_id", monotonically_increasing_id)
  df1WithIndex.show(false)

  /**
    * Add Column Index to dataframe to each row
    */
  def addColumnIndex(df: DataFrame) = {
    spark.sqlContext.createDataFrame(
      df.rdd.zipWithIndex.map {
        case (row, index) => Row.fromSeq(row.toSeq :+ index)
      },
      // Create schema for index column
      StructType(df.schema.fields :+ StructField("index", LongType, false)))
  }
}

Result :

+-------------------+-----+---------------------------+
|Name               |index|monotonically_increasing_id|
+-------------------+-----+---------------------------+
|Ram Ghadiyaram     |0    |0                          |
|Ravichandra        |1    |8589934592                 |
|ilker              |2    |8589934593                 |
|nick               |3    |17179869184                |
|Naveed             |4    |25769803776                |
|Gobinathan SP      |5    |25769803777                |
|Sreenivas Venigalla|6    |34359738368                |
|Jackela Kowski     |7    |42949672960                |
|Arindam Sengupta   |8    |42949672961                |
|Liangpi            |9    |51539607552                |
|Omar14             |10   |60129542144                |
|anshu kumar        |11   |60129542145                |
+-------------------+-----+---------------------------+

Upvotes: 15

Shantanu Sharma
Shantanu Sharma

Reputation: 101

As Ram said, zippedwithindex is better than monotonically increasing id, id you need consecutive row numbers. Try this (PySpark environment):

from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType

new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)])
zipped_rdd = **original_dataframe**.rdd.zipWithIndex()
indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))

where original_dataframe is the dataframe you have to add index on and row_with_index is the new schema with the column index which you can write as

row_with_index = Row(
"calendar_date"
,"year_week_number"
,"year_period_number"
,"realization"
,"index"
)

Here, calendar_date, year_week_number, year_period_number and realization were the columns of my original dataframe. You can replace the names with the names of your columns. index is the new column name you had to add for the row numbers.

Upvotes: 4

anshu kumar
anshu kumar

Reputation: 747

monotonically_increasing_id - The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.

"I want to add a column from 1 to row's number."

Let say we have the following DF

+--------+-------------+-------+
| userId | productCode | count |
+--------+-------------+-------+
|     25 |        6001 |     2 |
|     11 |        5001 |     8 |
|     23 |         123 |     5 |
+--------+-------------+-------+

To generate the IDs starting from 1

val w = Window.orderBy("count")
val result = df.withColumn("index", row_number().over(w))

This would add an index column ordered by increasing value of count.

+--------+-------------+-------+-------+
| userId | productCode | count | index |
+--------+-------------+-------+-------+
|     25 |        6001 |     2 |     1 |
|     23 |         123 |     5 |     2 |
|     11 |        5001 |     8 |     3 |
+--------+-------------+-------+-------+

Upvotes: 48

Omar14
Omar14

Reputation: 2055

With Scala you can use:

import org.apache.spark.sql.functions._ 

df.withColumn("id",monotonicallyIncreasingId)

You can refer to this exemple and scala docs.

With Pyspark you can use:

from pyspark.sql.functions import monotonically_increasing_id 

df_index = df.select("*").withColumn("id", monotonically_increasing_id())

Upvotes: 81

Related Questions