Reputation: 11
I am new to PySpark and I am struggling to find how many occurrences of each IP address are in the following list:
sampleJson = [('{"user":100, "ips" : ["191.168.192.101", "191.168.192.103", "191.168.192.96", "191.168.192.99"]}',), ('{"user":101, "ips" : ["191.168.192.102", "191.168.192.105", "191.168.192.103", "191.168.192.107"]}',), ('{"user":102, "ips" : ["191.168.192.105", "191.168.192.101", "191.168.192.105", "191.168.192.107"]}',), ('{"user":103, "ips" : ["191.168.192.96", "191.168.192.100", "191.168.192.107", "191.168.192.101"]}',), ('{"user":104, "ips" : ["191.168.192.99", "191.168.192.99", "191.168.192.102", "191.168.192.99"]}',),('{"user":105, "ips" : ["191.168.192.99", "191.168.192.99", "191.168.192.100", "191.168.192.96"]}',),]
Ideally, I need the results to look something like this:
ip | count |
---|---|
191.168.192.96 | 3 |
191.168.192.99 | 6 |
191.168.192.100 | 2 |
191.168.192.101 | 3 |
191.168.192.102 | 2 |
191.168.192.103 | 2 |
191.168.192.105 | 3 |
191.168.192.107 | 3 |
I have performed the following code below to get the results into a column with the user and another column that shows their ips but unable to now extract the counts of each ip into the desired output. Can anyone help?
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import json
json_df = spark.createDataFrame(sampleJson)
sch=StructType([StructField('user', StringType(),
False),StructField('ips',ArrayType(StringType()))])
json_df = json_df.withColumn("n",from_json(col("_1"),sch)).select("n.*").show(10,False)
|user|ips |
|:---|-------------------------------------------------------------------:|
|100 |[191.168.192.101, 191.168.192.103, 191.168.192.96, 191.168.192.99] |
|101 |[191.168.192.102, 191.168.192.105, 191.168.192.103, 191.168.192.107]|
|102 |[191.168.192.105, 191.168.192.101, 191.168.192.105, 191.168.192.107]|
|103 |[191.168.192.96, 191.168.192.100, 191.168.192.107, 191.168.192.101] |
|104 |[191.168.192.99, 191.168.192.99, 191.168.192.102, 191.168.192.99] |
|105 |[191.168.192.99, 191.168.192.99, 191.168.192.100, 191.168.192.96] |
Upvotes: 1
Views: 494
Reputation: 31
You can use explode function to transform array elements into rows:
json_df = spark.createDataFrame(sampleJson)
sch=StructType([StructField('user', StringType(),
False),StructField('ips',ArrayType(StringType()))])
json_df = json_df.withColumn("n",from_json(col("_1"),sch)).select("n.*")
json_df = json_df \
.withColumn('ip', explode("ips")) \
.groupby('ip') \
.agg(count('*').alias('count'))
json_df.show()
Upvotes: 1