mobupu
mobupu

Reputation: 255

Selecting a column from a dataset's row

I'd like to loop on a Spark dataset and save specific values in a Map depending on the characteristics of each row. I'm new to Spark and Scala so I joined a simple example of what I'm trying to do in python.

Minimal working example in python:

mydict = dict()
for row in data:
    if row['name'] == "Yan":    
        mydict[row['id']] =  row['surname']
    else:
        mydict[row['id']] = "Random lad"

Where data is a (big) spark dataset, of type org.apache.spark.sql.Dataset[org.apache.spark.sql.Row].

Do you know the Spark or Scala way of doing it?

Upvotes: 0

Views: 1254

Answers (3)

moe
moe

Reputation: 1806

You can not loop over the contents of a Dataset because they are not accessible on the machine running this code but instead are scattered over (possibly many) different worker nodes. That is a fundamental concept of distributed execution engines like spark.

Instead you have to manipulate your data in a functional (where map, filter, reduce, ... operations are spread to the workers) or declarative (sql queries that are performed on the workers) way.

To achieve your goal you could run a map over you data which checks whether the name equals "Yan" and go on from there. After this transformation you can collect your dataframe and transform it into a dict.

You should also check your approach on using Spark and the map: it seems you want to create an entry in mydict for each element of data. This means your data is either small enough that you don't really have to use Spark or it will probably fail because it does not fit in your drivers memory.

Upvotes: 1

koiralo
koiralo

Reputation: 23099

Here is a simple way to do it, but be careful with collect(), since it collects the data in driver. The data should be able to fit in driver.

I don't recommend you to do this.

var df: DataFrame = Seq(
  ("1", "Yan", "surname1"),
  ("2", "Yan1", "surname2"),
  ("3", "Yan", "surname3"),
  ("4", "Yan2", "surname4")
).toDF("id", "name", "surname")

val myDict = df.withColumn("newName", when($"name" === "Yan", $"surname").otherwise("RandomeName"))
  .rdd.map(row => (row.getAs[String]("id"), row.getAs[String]("newName")))
  .collectAsMap()

myDict.foreach(println)

Output:

(2,RandomeName)
(1,surname1)
(4,RandomeName)
(3,surname3)

Upvotes: 1

chlebek
chlebek

Reputation: 2451

I think you are looking for something like that. If your final df is not big you can collect it and store as map.

scala> df.show()
+---+----+--------+
| id|name|surrname|
+---+----+--------+
|  1| Yan|  abc123|
|  2| Abc|  def123|
+---+----+--------+


scala> df.select('id, when('name === "Yan", 'surrname).otherwise("Random lad")).toDF("K","V").show()
+---+----------+
|  K|         V|
+---+----------+
|  1|    abc123|
|  2|Random lad|
+---+----------+

Upvotes: 1

Related Questions