Markus
Markus

Reputation: 3782

How to transform particular code piece from Spark 1.6.2 to Spark 2.2.0?

I need to pass my Spark 1.6.2 code to Spark 2.2.0 in Java.

 DataFrame eventsRaw = sqlContext.sql("SELECT * FROM my_data");
 Row[] rddRows = eventsRaw.collect();
 for (int rowIdx = 0; rowIdx < rddRows.length; ++rowIdx)
 {
     Map<String, String> myProperties = new HashMap<>();
     myProperties.put("startdate", rddRows[rowIdx].get(1).toString());
     JEDIS.hmset("PK:" + rddRows[rowIdx].get(0).toString(), myProperties); // JEDIS is a Redis client for Java
 }

As far as I understand, there is no DataFrame in Spark 2.2.0 for Java. Only Dataset. However, if I substitute DataFrame with Dataset, then I get Object[] instead of Row[] as output of eventsRaw.collect(). Then get(1) is marked in red and I cannot compile the code.

How can I correctly do it?

Upvotes: 0

Views: 142

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35229

DataFrame (Scala) is Dataset<Row>:

SparkSession spark;

...

Dataset<Row> eventsRaw = spark.sql("SELECT * FROM my_data");

but instead of collect you should rather use foreach (use lazy singleton connection) :

eventsRaw.foreach(
   (ForeachFunction<Row>) row -> ... // replace ... with appropriate logic
);  

or foreachPartition (initialize connection for each partition):

eventsRaw.foreachPartition((ForeachPartitionFunction<Row)) rows -> {
   ... // replace ... with appropriate logic
})

Upvotes: 2

Related Questions