Reputation: 45
First, I want to say that I am very new at Scala and I am having problems with basic conversion formats... I hope I can get better in functional programming in the near future, so sorry if this is a dumb question
Using Spark SQL I do a query and I get the results in a variable called "probesGroupby"
val probesGroupby = sqlContext.sql("SELECT id_counter as id_counter, co_mac as co_mac, ts_timestamp as ts_timestamp, max(qt_rssi) as qt_rssi, count(*) as qt_tracks " +
" FROM probes GROUP BY id_counter, co_mac, ts_timestamp")
All right until here. After this, I need to write data into a InfluxDB Database and the API demands this format:
val probeRequest= Series("probeRequest",
Array("id_counter","co_mac","time","qt_rssi","qt_tracks"),
Array(
Array(row[0],row[1],row[2],row[3], row[4]),
Array(row[0],row[1],row[2],row[3], row[4]),
Array(row[0],row[1],row[2],row[3], row[4]),
...
)
)
assert(None == client.writeSeries(Array(probeRequest)))
How may I create the variable "probeRequest" with this format so I have an Array with values for each Row returned by the query? I have tried some stuffs but it doesn't seem to work :(
Thank you in advance,
Upvotes: 2
Views: 7915
Reputation: 330063
You'll have to wrap it using Series
but otherwise it is as simple as that:
probesGroupby.map(_.toSeq.toArray).collect
or if you prefer more explicit approach you can use pattern matching:
rdd.map { case Row(idCounter, coMac, time, qtRssi, tTracks) =>
Array(idCounter, coMac, time, qtRssi, tTracks)
} collect
Upvotes: 1
Reputation: 9734
First you collect
data from RDD to Array on a driver, and then transform each individual Row. If you now the types of columns you can use getInt
, getLong
etc instead of just get
val probesGroupby: RDD[Row] = ...
val payload: Array[Array[Any]] = probesGroupby.collect().map { row =>
val idCounter = row.get(0);
val coMac = row.get(1)
val time = row.get(2)
val qtRssi = row.get(3)
val qtTracks = row.get(4)
Array(idCounter, coMac, time, qtrs., qtTracks)
// or just: row.toArray()
}
val probeRequest= Series("probeRequest",
Array("id_counter","co_mac","time","qt_rssi","qt_tracks"),
payload,
...
)
)
Upvotes: 1