lee
lee

Reputation: 159

How can I update one column value in an RDD[Row]?

I use scala for spark, I want to update one column value in an RDD, my data format is like this:

[510116,8042,1,8298,20170907181326,1,3,lineno805]
[510116,8042,1,8152,20170907182101,1,3,lineno805]
[510116,8042,1,8154,20170907164311,1,3,lineno805]
[510116,8042,1,8069,20170907165031,1,3,lineno805]
[510116,8042,1,8061,20170907170254,1,3,lineno805]
[510116,8042,1,9906,20170907171417,1,3,lineno805]
[510116,8042,1,8295,20170907174734,1,3,lineno805]

my scala code is like this:

 val getSerialRdd: RDD[Row]=……

I want to update the column which is contain data 20170907181326, I wish the data like follow format:

[510116,8042,1,8298,2017090718,1,3,lineno805]
[510116,8042,1,8152,2017090718,1,3,lineno805]
[510116,8042,1,8154,2017090716,1,3,lineno805]
[510116,8042,1,8069,2017090716,1,3,lineno805]
[510116,8042,1,8061,2017090717,1,3,lineno805]
[510116,8042,1,9906,2017090717,1,3,lineno805]
[510116,8042,1,8295,2017090717,1,3,lineno805]

and output the RDD type like RDD[Row].

How I can do this?

Upvotes: 0

Views: 6052

Answers (2)

Rubber Duck
Rubber Duck

Reputation: 3723

In some cases you might want to update a row with a schema

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema

def update(r: Row, i: Int, a: Any): Row = {

    val s: Array[Any] = r
      .toSeq
      .toArray
      .updated(i, a)

    new GenericRowWithSchema(s, r.schema)
}

rdd.map(update(_)).show(false)

Upvotes: 3

akuiper
akuiper

Reputation: 214957

You can define an update method like this to update a field in the Row:

import org.apache.spark.sql.Row

def update(r: Row): Row = {
    val s = r.toSeq
    Row.fromSeq((s.take(4) :+ s(4).asInstanceOf[String].take(10)) ++ s.drop(5))
}

rdd.map(update(_)).collect

//res13: Array[org.apache.spark.sql.Row] = 
//       Array([510116,8042,1,8298,2017090718,1,3,lineno805], 
//             [510116,8042,1,8152,2017090718,1,3,lineno805], 
//             [510116,8042,1,8154,2017090716,1,3,lineno805], 
//             [510116,8042,1,8069,2017090716,1,3,lineno805], 
//             [510116,8042,1,8061,2017090717,1,3,lineno805], 
//             [510116,8042,1,9906,2017090717,1,3,lineno805], 
//             [510116,8042,1,8295,2017090717,1,3,lineno805])

A simpler approach would be to use DataFrame API and the substring function:

1) Create a data frame from the rdd:

val df = spark.createDataFrame(rdd, rdd.take(1)(0).schema)
// df: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 6 more fields]

2) use substring to transform the column:

df.withColumn("_c4", substring($"_c4", 0, 10)).show
+------+----+---+----+----------+---+---+---------+
|   _c0| _c1|_c2| _c3|       _c4|_c5|_c6|      _c7|
+------+----+---+----+----------+---+---+---------+
|510116|8042|  1|8298|2017090718|  1|  3|lineno805|
|510116|8042|  1|8152|2017090718|  1|  3|lineno805|
|510116|8042|  1|8154|2017090716|  1|  3|lineno805|
|510116|8042|  1|8069|2017090716|  1|  3|lineno805|
|510116|8042|  1|8061|2017090717|  1|  3|lineno805|
|510116|8042|  1|9906|2017090717|  1|  3|lineno805|
|510116|8042|  1|8295|2017090717|  1|  3|lineno805|
+------+----+---+----+----------+---+---+---------+

3) convert data frame to rdd is easy:

val getSerialRdd = df.withColumn("_c4", substring($"_c4", 0, 10)).rdd

Upvotes: 3

Related Questions