Queen
Queen

Reputation: 571

Error of "Not supported within a window function" occurs when, operating on columns of dataframe performed in Spark in scala

I have the following raw data and I need to clean it:

 03:35:20.299037 IP 10.0.0.1 > 10.0.0.2: ICMP echo request, id 8321, seq 17, length 64
    03:35:20.327290 IP 10.0.0.1 > 10.0.0.3: ICMP echo reply, id 8321, seq 17, length 64
    03:35:20.330845 IP 10.0.0.1 > 10.0.0.3: ICMP echo request, id 8311, seq 19, length 64
 03:35:20.330892 IP 10.0.0.1 > 10.0.0.3: ICMP echo request, id 8321, seq 17, length 64
    03:35:20.330918 IP 10.0.0.1 > 10.0.0.3: ICMP echo reply, id 8321, seq 17, length 64
    03:35:20.330969 IP 10.0.0.1 > 10.0.0.4: ICMP echo request, id 8311, seq 19, length 64
  03:35:20.331041 IP 10.0.0.1 > 10.0.0.4: ICMP echo request, id 8311, seq 19, length 64

I need to have the following output:

+---------------+-----------+-------------+----+------+---+----+
|   time_stamp_0|sender_ip_1|receiver_ip_2|rank|count | xi|pi  |
+---------------+-----------+-------------+----+------+---+----+
|03:35:20.299037|   10.0.0.1|     10.0.0.2|   1|     7| 1 |0.14|  
|03:35:20.327290|   10.0.0.1|     10.0.0.3|   1|     7| 4 |0.57|
|03:35:20.330845|   10.0.0.1|     10.0.0.3|   2|     7| 4 |0.57|
|03:35:20.330892|   10.0.0.1|     10.0.0.3|   3|     7| 4 |0.57|
|03:35:20.330918|   10.0.0.1|     10.0.0.3|   4|     7| 4 |0.57|
|03:35:20.330969|   10.0.0.1|     10.0.0.4|   1|     7| 2 |0.28|
|03:35:20.331041|   10.0.0.1|     10.0.0.4|   2|     7| 2 |0.28|

According to above dataframe, I need to put the total number of repetition of each source IP and destination IP as "xi" column, the number of total rows as "count" column and the devision of xi/count as "pi" column. My problem will start when I want to calculate xi and I got the following error:

   Exception in thread "main" org.apache.spark.sql.AnalysisException: Expression 'count#11L' not supported within a window function.;;
Project [time_stamp_0#3, sender_ip_1#4, receiver_ip_2#5, count#11L, rank#20, xi#45, pi#90]
+- Project [time_stamp_0#3, sender_ip_1#4, receiver_ip_2#5, count#11L, rank#20, xi#45, _we0#109L, (cast(xi#45 as double) / cast(_we0#109L as double)) AS pi#90]
   +- Window [count#11L windowspecdefinition(sender_ip_1#4, receiver_ip_2#5, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#109L], [sender_ip_1#4, receiver_ip_2#5]

    ...

I have the following code:

      val customSchema = StructType(Array(
      StructField("time_stamp_0", StringType, true),
      StructField("sender_ip_1", StringType, true),
      StructField("receiver_ip_2", StringType, true)))

    ///////////////////////////////////////////////////make train dataframe
    val Dstream_Train = sc.textFile("/Users/saeedtkh/Desktop/sharedsaeed/Test/dsetcopy.txt")

    val Row_Dstream_Train = Dstream_Train.map(line => line.split(",")).map(array => {

      val array1 = array(0).trim.split("IP")
      val array2 = array1(1).split(">")
      val array3 = array2(1).split(":")

      val first = Try(array1(0).trim) getOrElse ""
      val second = Try(array2(0).trim) getOrElse ""
      val third = Try(array3(0)) getOrElse ""

      Row.fromSeq(Seq(first, second, third))
    })

    val Frist_Dataframe = session.createDataFrame(Row_Dstream_Train, customSchema)
    val columns1and2_count = Window.partitionBy("sender_ip_1", "receiver_ip_2")
    val columns1and2_rank = Window.partitionBy("sender_ip_1", "receiver_ip_2").orderBy("time_stamp_0")
    // <-- matches groupBy
    //Add rank() to df
    val Dataframe_add_rank = Frist_Dataframe.withColumn("count", count($"receiver_ip_2") over columns1and2_count).distinct()
    //Add cout to df
    val Dataframe_add_rank_count = Dataframe_add_rank.withColumn("rank", rank() over columns1and2_rank).distinct()// Dataframe.show()
    //Add x(i) to df
    val Dataframe_add_rank_count_xi = Dataframe_add_rank_count.withColumn("xi", rank() over columns1and2_rank).distinct()// Dataframe.show()
    //Add p(i)=maxrank(x(i))/count
    val Dataframe_add_rank_count_xi_pi = Dataframe_add_rank_count_xi.withColumn("pi", $"xi" / ($"count").over(columns1and2_count))// Dataframe.show()
  val std_dev=Dataframe_add_rank_count.agg(stddev_pop($"rank"))
    val stdDevValue = std_dev.head.getDouble(0)
    std_dev.show()
    //Add attack status

    val final_add_count_rank_xi_attack = Dataframe_add_rank_count_xi_pi.withColumn("attack", when($"rank" < stdDevValue , 0).otherwise(1))


Dataframe_add_rank_count_xi_pi.show()

Update: According to answer one, I got the following output:

+---------------+-----------+-------------+-----+----+---+---+------+
|   time_stamp_0|sender_ip_1|receiver_ip_2|count|rank| xi| pi|attack|
+---------------+-----------+-------------+-----+----+---+---+------+
|03:35:20.330969|   10.0.0.1|     10.0.0.4|    2|   1|  4|2.0|     0|
|03:35:20.331041|   10.0.0.1|     10.0.0.4|    2|   2|  4|2.0|     1|
|03:35:20.299037|   10.0.0.1|     10.0.0.2|    1|   1|  4|4.0|     0|
|03:35:20.327290|   10.0.0.1|     10.0.0.3|    4|   1|  4|1.0|     0|
|03:35:20.330845|   10.0.0.1|     10.0.0.3|    4|   2|  4|1.0|     1|
|03:35:20.330892|   10.0.0.1|     10.0.0.3|    4|   3|  4|1.0|     1|
|03:35:20.330918|   10.0.0.1|     10.0.0.3|    4|   4|  4|1.0|     1|
+---------------+-----------+-------------+-----+----+---+---+------+

So the xi is not correct. :( Can you help me? Thanks in advance.

Upvotes: 2

Views: 5350

Answers (1)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

If you are looking to populate xi column with max of count. As you said

the maximum number of repetition for each source and destination IP as "xi" column

then you should be doing

val maxForXI = Dataframe_add_rank_count.agg(max("rank")).first.get(0)
val Dataframe_add_rank_count_xi = Dataframe_add_rank_count.withColumn("xi", lit(maxForXI))

you should get

+---------------+-----------+-------------+-----+----+---+
|time_stamp_0   |sender_ip_1|receiver_ip_2|count|rank|xi |
+---------------+-----------+-------------+-----+----+---+
|03:35:20.330969|10.0.0.1   | 10.0.0.4    |2    |1   |4  |
|03:35:20.331041|10.0.0.1   | 10.0.0.4    |2    |2   |4  |
|03:35:20.299037|10.0.0.1   | 10.0.0.2    |1    |1   |4  |
|03:35:20.327290|10.0.0.1   | 10.0.0.3    |4    |1   |4  |
|03:35:20.330845|10.0.0.1   | 10.0.0.3    |4    |2   |4  |
|03:35:20.330892|10.0.0.1   | 10.0.0.3    |4    |3   |4  |
|03:35:20.330918|10.0.0.1   | 10.0.0.3    |4    |4   |4  |
+---------------+-----------+-------------+-----+----+---+

for the last column pi you said

the devision of xi/count as "pi" column

so for that you should do

val Dataframe_add_rank_count_xi_pi = Dataframe_add_rank_count_xi.withColumn("pi", $"xi" / $"count")

You should have desired result as

+---------------+-----------+-------------+-----+----+---+---+
|   time_stamp_0|sender_ip_1|receiver_ip_2|count|rank| xi| pi|
+---------------+-----------+-------------+-----+----+---+---+
|03:35:20.330969|   10.0.0.1|     10.0.0.4|    2|   1|  4|2.0|
|03:35:20.331041|   10.0.0.1|     10.0.0.4|    2|   2|  4|2.0|
|03:35:20.299037|   10.0.0.1|     10.0.0.2|    1|   1|  4|4.0|
|03:35:20.327290|   10.0.0.1|     10.0.0.3|    4|   1|  4|1.0|
|03:35:20.330845|   10.0.0.1|     10.0.0.3|    4|   2|  4|1.0|
|03:35:20.330892|   10.0.0.1|     10.0.0.3|    4|   3|  4|1.0|
|03:35:20.330918|   10.0.0.1|     10.0.0.3|    4|   4|  4|1.0|
+---------------+-----------+-------------+-----+----+---+---+

Edited

Since the question was edited, final edited answer would be as below, (these are the codes after window functions are defined)

//Add rank() to df
val cnt = Frist_Dataframe.count
val Dataframe_add_rank = Frist_Dataframe.withColumn("count", lit(cnt))
//Add cout to df
val Dataframe_add_rank_count = Dataframe_add_rank.withColumn("rank", rank() over columns1and2_rank).distinct()// Dataframe.show()
//Add x(i) to df
val Dataframe_add_rank_count_xi = Dataframe_add_rank_count.withColumn("xi", count($"receiver_ip_2") over columns1and2_count)// Dataframe.show()
//Add p(i)=maxrank(x(i))/count
val Dataframe_add_rank_count_xi_pi = Dataframe_add_rank_count_xi.withColumn("pi", $"xi" / $"count")// Dataframe.show()

val std_dev=Dataframe_add_rank_count.agg(stddev_pop($"rank"))
val stdDevValue = std_dev.head.getDouble(0)
std_dev.show()
//Add attack status

val final_add_count_rank_xi_attack = Dataframe_add_rank_count_xi_pi.withColumn("attack", when($"rank" < stdDevValue , 0).otherwise(1))
final_add_count_rank_xi_attack.show()

I hope you get the right table this time.

Upvotes: 4

Related Questions