Reputation: 571
I have the following raw data and I need to clean it:
03:35:20.299037 IP 10.0.0.1 > 10.0.0.2: ICMP echo request, id 8321, seq 17, length 64
03:35:20.327290 IP 10.0.0.1 > 10.0.0.3: ICMP echo reply, id 8321, seq 17, length 64
03:35:20.330845 IP 10.0.0.1 > 10.0.0.3: ICMP echo request, id 8311, seq 19, length 64
03:35:20.330892 IP 10.0.0.1 > 10.0.0.3: ICMP echo request, id 8321, seq 17, length 64
03:35:20.330918 IP 10.0.0.1 > 10.0.0.3: ICMP echo reply, id 8321, seq 17, length 64
03:35:20.330969 IP 10.0.0.1 > 10.0.0.4: ICMP echo request, id 8311, seq 19, length 64
03:35:20.331041 IP 10.0.0.1 > 10.0.0.4: ICMP echo request, id 8311, seq 19, length 64
I need to have the following output:
+---------------+-----------+-------------+----+------+---+----+
| time_stamp_0|sender_ip_1|receiver_ip_2|rank|count | xi|pi |
+---------------+-----------+-------------+----+------+---+----+
|03:35:20.299037| 10.0.0.1| 10.0.0.2| 1| 7| 1 |0.14|
|03:35:20.327290| 10.0.0.1| 10.0.0.3| 1| 7| 4 |0.57|
|03:35:20.330845| 10.0.0.1| 10.0.0.3| 2| 7| 4 |0.57|
|03:35:20.330892| 10.0.0.1| 10.0.0.3| 3| 7| 4 |0.57|
|03:35:20.330918| 10.0.0.1| 10.0.0.3| 4| 7| 4 |0.57|
|03:35:20.330969| 10.0.0.1| 10.0.0.4| 1| 7| 2 |0.28|
|03:35:20.331041| 10.0.0.1| 10.0.0.4| 2| 7| 2 |0.28|
According to above dataframe, I need to put the total number of repetition of each source IP and destination IP as "xi" column, the number of total rows as "count" column and the devision of xi/count as "pi" column. My problem will start when I want to calculate xi and I got the following error:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Expression 'count#11L' not supported within a window function.;;
Project [time_stamp_0#3, sender_ip_1#4, receiver_ip_2#5, count#11L, rank#20, xi#45, pi#90]
+- Project [time_stamp_0#3, sender_ip_1#4, receiver_ip_2#5, count#11L, rank#20, xi#45, _we0#109L, (cast(xi#45 as double) / cast(_we0#109L as double)) AS pi#90]
+- Window [count#11L windowspecdefinition(sender_ip_1#4, receiver_ip_2#5, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#109L], [sender_ip_1#4, receiver_ip_2#5]
...
I have the following code:
val customSchema = StructType(Array(
StructField("time_stamp_0", StringType, true),
StructField("sender_ip_1", StringType, true),
StructField("receiver_ip_2", StringType, true)))
///////////////////////////////////////////////////make train dataframe
val Dstream_Train = sc.textFile("/Users/saeedtkh/Desktop/sharedsaeed/Test/dsetcopy.txt")
val Row_Dstream_Train = Dstream_Train.map(line => line.split(",")).map(array => {
val array1 = array(0).trim.split("IP")
val array2 = array1(1).split(">")
val array3 = array2(1).split(":")
val first = Try(array1(0).trim) getOrElse ""
val second = Try(array2(0).trim) getOrElse ""
val third = Try(array3(0)) getOrElse ""
Row.fromSeq(Seq(first, second, third))
})
val Frist_Dataframe = session.createDataFrame(Row_Dstream_Train, customSchema)
val columns1and2_count = Window.partitionBy("sender_ip_1", "receiver_ip_2")
val columns1and2_rank = Window.partitionBy("sender_ip_1", "receiver_ip_2").orderBy("time_stamp_0")
// <-- matches groupBy
//Add rank() to df
val Dataframe_add_rank = Frist_Dataframe.withColumn("count", count($"receiver_ip_2") over columns1and2_count).distinct()
//Add cout to df
val Dataframe_add_rank_count = Dataframe_add_rank.withColumn("rank", rank() over columns1and2_rank).distinct()// Dataframe.show()
//Add x(i) to df
val Dataframe_add_rank_count_xi = Dataframe_add_rank_count.withColumn("xi", rank() over columns1and2_rank).distinct()// Dataframe.show()
//Add p(i)=maxrank(x(i))/count
val Dataframe_add_rank_count_xi_pi = Dataframe_add_rank_count_xi.withColumn("pi", $"xi" / ($"count").over(columns1and2_count))// Dataframe.show()
val std_dev=Dataframe_add_rank_count.agg(stddev_pop($"rank"))
val stdDevValue = std_dev.head.getDouble(0)
std_dev.show()
//Add attack status
val final_add_count_rank_xi_attack = Dataframe_add_rank_count_xi_pi.withColumn("attack", when($"rank" < stdDevValue , 0).otherwise(1))
Dataframe_add_rank_count_xi_pi.show()
Update: According to answer one, I got the following output:
+---------------+-----------+-------------+-----+----+---+---+------+
| time_stamp_0|sender_ip_1|receiver_ip_2|count|rank| xi| pi|attack|
+---------------+-----------+-------------+-----+----+---+---+------+
|03:35:20.330969| 10.0.0.1| 10.0.0.4| 2| 1| 4|2.0| 0|
|03:35:20.331041| 10.0.0.1| 10.0.0.4| 2| 2| 4|2.0| 1|
|03:35:20.299037| 10.0.0.1| 10.0.0.2| 1| 1| 4|4.0| 0|
|03:35:20.327290| 10.0.0.1| 10.0.0.3| 4| 1| 4|1.0| 0|
|03:35:20.330845| 10.0.0.1| 10.0.0.3| 4| 2| 4|1.0| 1|
|03:35:20.330892| 10.0.0.1| 10.0.0.3| 4| 3| 4|1.0| 1|
|03:35:20.330918| 10.0.0.1| 10.0.0.3| 4| 4| 4|1.0| 1|
+---------------+-----------+-------------+-----+----+---+---+------+
So the xi is not correct. :( Can you help me? Thanks in advance.
Upvotes: 2
Views: 5350
Reputation: 41957
If you are looking to populate xi
column with max
of count. As you said
the maximum number of repetition for each source and destination IP as "xi" column
then you should be doing
val maxForXI = Dataframe_add_rank_count.agg(max("rank")).first.get(0)
val Dataframe_add_rank_count_xi = Dataframe_add_rank_count.withColumn("xi", lit(maxForXI))
you should get
+---------------+-----------+-------------+-----+----+---+
|time_stamp_0 |sender_ip_1|receiver_ip_2|count|rank|xi |
+---------------+-----------+-------------+-----+----+---+
|03:35:20.330969|10.0.0.1 | 10.0.0.4 |2 |1 |4 |
|03:35:20.331041|10.0.0.1 | 10.0.0.4 |2 |2 |4 |
|03:35:20.299037|10.0.0.1 | 10.0.0.2 |1 |1 |4 |
|03:35:20.327290|10.0.0.1 | 10.0.0.3 |4 |1 |4 |
|03:35:20.330845|10.0.0.1 | 10.0.0.3 |4 |2 |4 |
|03:35:20.330892|10.0.0.1 | 10.0.0.3 |4 |3 |4 |
|03:35:20.330918|10.0.0.1 | 10.0.0.3 |4 |4 |4 |
+---------------+-----------+-------------+-----+----+---+
for the last column pi
you said
the devision of xi/count as "pi" column
so for that you should do
val Dataframe_add_rank_count_xi_pi = Dataframe_add_rank_count_xi.withColumn("pi", $"xi" / $"count")
You should have desired result as
+---------------+-----------+-------------+-----+----+---+---+
| time_stamp_0|sender_ip_1|receiver_ip_2|count|rank| xi| pi|
+---------------+-----------+-------------+-----+----+---+---+
|03:35:20.330969| 10.0.0.1| 10.0.0.4| 2| 1| 4|2.0|
|03:35:20.331041| 10.0.0.1| 10.0.0.4| 2| 2| 4|2.0|
|03:35:20.299037| 10.0.0.1| 10.0.0.2| 1| 1| 4|4.0|
|03:35:20.327290| 10.0.0.1| 10.0.0.3| 4| 1| 4|1.0|
|03:35:20.330845| 10.0.0.1| 10.0.0.3| 4| 2| 4|1.0|
|03:35:20.330892| 10.0.0.1| 10.0.0.3| 4| 3| 4|1.0|
|03:35:20.330918| 10.0.0.1| 10.0.0.3| 4| 4| 4|1.0|
+---------------+-----------+-------------+-----+----+---+---+
Edited
Since the question was edited, final edited answer would be as below, (these are the codes after window functions are defined)
//Add rank() to df
val cnt = Frist_Dataframe.count
val Dataframe_add_rank = Frist_Dataframe.withColumn("count", lit(cnt))
//Add cout to df
val Dataframe_add_rank_count = Dataframe_add_rank.withColumn("rank", rank() over columns1and2_rank).distinct()// Dataframe.show()
//Add x(i) to df
val Dataframe_add_rank_count_xi = Dataframe_add_rank_count.withColumn("xi", count($"receiver_ip_2") over columns1and2_count)// Dataframe.show()
//Add p(i)=maxrank(x(i))/count
val Dataframe_add_rank_count_xi_pi = Dataframe_add_rank_count_xi.withColumn("pi", $"xi" / $"count")// Dataframe.show()
val std_dev=Dataframe_add_rank_count.agg(stddev_pop($"rank"))
val stdDevValue = std_dev.head.getDouble(0)
std_dev.show()
//Add attack status
val final_add_count_rank_xi_attack = Dataframe_add_rank_count_xi_pi.withColumn("attack", when($"rank" < stdDevValue , 0).otherwise(1))
final_add_count_rank_xi_attack.show()
I hope you get the right table this time.
Upvotes: 4