Reputation: 784
Here is the sample senario, we have real time data record in cassandra, and we want to aggregate the data in different time ranges. What I write code like below:
val timeRanges = getTimeRanges(report)
timeRanges.foreach { timeRange =>
val (timestampStart, timestampEnd) = timeRange
val query = _sc.get.cassandraTable(report.keyspace, utilities.Helper.makeStringValid(report.scope)).
where(s"TIMESTAMP > ?", timestampStart).
where(s"VALID_TIMESTAMP <= ?", timestampEnd)
......do the aggregation work....
what the issue of the code is that for every time range, the aggregation work is running not in parallized. My question is how can I parallized the aggregation work? Since RDD can't run in another RDD or Future? Is there any way to parallize the work, or we can't using spark connector here?
Upvotes: 1
Views: 433
Reputation: 784
Finally , we using union to join each RDD and makes them parallized.
Upvotes: 0
Reputation: 16576
Use the joinWithCassandraTable function. This allows you to use the data from one RDD to access C* and pull records just like in your example.
joinWithCassandraTable
utilizes the java driver to execute a single query for every partition required by the source RDD so no un-needed data will be requested or serialized. This means a join between any RDD and a Cassandra Table can be preformed without doing a full table scan. When preformed between two Cassandra Tables which share the same partition key this will not require movement of data between machines. In all cases this method will use the source RDD's partitioning and placement for data locality.
Upvotes: 1