MagicTaly
MagicTaly

Reputation: 45

Scala/Spark SQL Array[row] to Array(Array(values))

First, I want to say that I am very new at Scala and I am having problems with basic conversion formats... I hope I can get better in functional programming in the near future, so sorry if this is a dumb question

Using Spark SQL I do a query and I get the results in a variable called "probesGroupby"

  val probesGroupby = sqlContext.sql("SELECT id_counter as id_counter, co_mac as co_mac, ts_timestamp as ts_timestamp, max(qt_rssi) as qt_rssi, count(*) as qt_tracks " +
                                     " FROM probes GROUP BY id_counter, co_mac, ts_timestamp")

All right until here. After this, I need to write data into a InfluxDB Database and the API demands this format:

val probeRequest= Series("probeRequest",
  Array("id_counter","co_mac","time","qt_rssi","qt_tracks"),
  Array(
    Array(row[0],row[1],row[2],row[3], row[4]),
    Array(row[0],row[1],row[2],row[3], row[4]),
    Array(row[0],row[1],row[2],row[3], row[4]),
    ...
  )
)
assert(None == client.writeSeries(Array(probeRequest)))

How may I create the variable "probeRequest" with this format so I have an Array with values for each Row returned by the query? I have tried some stuffs but it doesn't seem to work :(

Thank you in advance,

Upvotes: 2

Views: 7915

Answers (2)

zero323
zero323

Reputation: 330063

You'll have to wrap it using Series but otherwise it is as simple as that:

probesGroupby.map(_.toSeq.toArray).collect

or if you prefer more explicit approach you can use pattern matching:

rdd.map { case Row(idCounter, coMac, time, qtRssi, tTracks) => 
    Array(idCounter, coMac, time, qtRssi, tTracks)
} collect

Upvotes: 1

Eugene Zhulenev
Eugene Zhulenev

Reputation: 9734

First you collectdata from RDD to Array on a driver, and then transform each individual Row. If you now the types of columns you can use getInt, getLong etc instead of just get

val probesGroupby: RDD[Row] = ...

val payload: Array[Array[Any]] = probesGroupby.collect().map { row =>
   val idCounter = row.get(0);
   val coMac = row.get(1)
   val time = row.get(2)
   val qtRssi = row.get(3)
   val qtTracks = row.get(4)
   Array(idCounter, coMac, time, qtrs., qtTracks)
   // or just: row.toArray()
}

val probeRequest= Series("probeRequest",
  Array("id_counter","co_mac","time","qt_rssi","qt_tracks"),
  payload,
  ...
 )
 )

Upvotes: 1

Related Questions