Reputation: 175
The data that is in data.csv file is:
07:36:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48
07:33:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66
07:34:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47
07:35:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:44:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49
07:45:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:46:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33
07:47:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:48:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:36:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:37:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48
07:37:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66
07:38:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47
07:39:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:50:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49
07:51:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:52:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33
07:53:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:54:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:40:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
This my Code:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object ScalaApp {
def main(args: Array[String]) {
val sc = new SparkContext("local[4]", "Program")
// we take the raw data in CSV format and convert it into a
val data = sc.textFile("data.csv")
.map(line => line.split(","))
.map(GroupRecord => (GroupRecord(0),
GroupRecord(1),GroupRecord(2),GroupRecord(3),GroupRecord(4),GroupRecord(5)))
val numPurchases = data.count()
val d1=data.groupByKey(GroupRecord(2)) // here is the error
println("No: " + numPurchases)
println("Grouped Data" + d1)
}
}
I just want the same data that is group by source-IP (2nd column) and order by time (1st column ). So my require data is :
07:33:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66
07:34:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47
07:35:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:37:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66
07:38:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47
07:39:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:40:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:44:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49
07:45:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:46:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33
07:47:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:50:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49
07:51:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:52:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33
07:53:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:54:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:36:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48
07:37:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48
but i have problem with my code so plz help me !
Upvotes: 3
Views: 2125
Reputation: 31
The above mentioned solution will work fine . But When data increases to large size i am not sure as u need to handle reshuffling
Best way is create a dataframe and using sqlContext order BY IP address and Time
Upvotes: 3
Reputation: 2072
Here we are require to convert it to key, value pairs to apply GroupByKey mechanism and after that the values will get convert to Iterable set and to apply sorting to the values of every key we need to convert it to sequence and then apply sortby functionality and after that flatMap function will flatten the sequential values to String sets.
Data.csv ->
07:36:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48
07:33:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66
07:34:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47
07:35:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:44:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49
07:45:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:46:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33
07:47:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:48:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:36:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:37:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48
07:37:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66
07:38:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47
07:39:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:50:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49
07:51:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:52:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33
07:53:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:54:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:40:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
Code ->
val data = sc.textFile("src/Data.csv")
.map(line => {
val GroupRecord = line.split("\t")
((GroupRecord(1)), (GroupRecord(0), GroupRecord(2), GroupRecord(3), GroupRecord(4), GroupRecord(5)))
})
val numPurchases = data.count()
val d1 = data.groupByKey().map(f => (f._1, f._2.toSeq.sortBy(f => f._1))).flatMapValues(f => f).map(f => (f._2._1, f._1, f._2._2, f._2._3, f._2._4, f._2._5))
d1 foreach (println(_))
println("No: " + numPurchases)
Upvotes: 0
Reputation: 13346
As Glennie pointed out, you don't create a key-value pair for the groupByKey
operation. However, you can also use groupBy(_._3)
to obtain the same result. In order to sort each group with respect to the first column, you can apply a flatMapValues
after the grouping to sort the items in each group. The following code does exactly that:
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Test").setMaster("local[4]")
val sc = new SparkContext(sparkConf)
val data = sc.textFile("data.csv")
.map(line => line.split("\\s+"))
.map(GroupRecord => (GroupRecord(2), (GroupRecord(0), GroupRecord(1),GroupRecord(2),GroupRecord(3),GroupRecord(4),GroupRecord(5))))
// sort the groups by the first tuple field
val result = data.groupByKey.flatMapValues(x => x.toList.sortBy(_._1))
// assign the partition ID to each item to see that each group is sorted
val resultWithPartitionID = result.mapPartitionsWithIndex((id, it) => it.map(x => (id, x)))
// print the contents of the RDD, elements of different partitions might be interleaved
resultWithPartitionID foreach println
val collectedResult = resultWithPartitionID.collect.sortBy(_._1).map(_._2)
// print collected results
println(collectedResult.mkString("\n"))
}
Upvotes: 2
Reputation: 13154
Your problem is that your second map
creates a Tuple6
instead of a key-value pair, which is what is required if you want to do a xxxByKey operation. If you want to group by the 2nd column, you should make GroupRecord(1)
your key and the rest values, and then call groupByKey
, like this:
data
.map(GroupRecord => (GroupRecord(1),(GroupRecord(0),GroupRecord(2),GroupRecord(3),GroupRecord(4),GroupRecord(5)))
.groupByKey()
Upvotes: 3