Aryan Tyagi
Aryan Tyagi

Reputation: 71

Apply the filter condition using the json data

I am generating the JSON data using the UI and that UI is used to take the input from user and generate the JSON file consisting of the input. Then we use JSON file data to filter out the records on the basis of two columns(country and currency).

json__data = {
"Year": 2019,
"EXCHANGE":
    [
        {
            "total": 142250.0,
            "COUNTRY":"JPN",
            "CUR": "YEN"
        },
        {
            "total": 210.88999999999996,
            "COUNTRY": "US"
            "CUR": "DOL"
        },
        {
            "total": 1065600.0,
            "COUNTRY": "UK" 
            "CUR": "POU"
        }
]

}

Above is the json file sample. I have another file demo.csv that has multiple columns like(Name, Currency, Contact_details, Currency etc.). I need only the records which are containing in the json file. For example- I need records from country (US, UK, JPN) and currency(DOL, POU, YEN). I need the logic to implement this condition.

import json
from pyspark.sql import SparkSession

class Test(object):
    def __init__(self, json__data,a):
         self.a = []
         self.dict = json.loads(json__data)
          for data in dict:
               a.append(data['COUNTRY'])          #a= [US, UK, JPN] 
               b.append(data['CUR'])              #b= [DOL, POU, YEN]
        
def spark(self,a):
     ss = SparkSession.builder.master("local[1]").appName("This is sample csv").getOrCreate()
     df = ss.read.format("csv").load("C:\\Users\\demo.csv")
     dt = spark.sql(SELECT * FROM df where col("COUNTRY") IN a  and col("CURRENCY") IN b)  # I need all records satisfying given filter conditions
     ds = dt.show()

Upvotes: 0

Views: 591

Answers (1)

blackbishop
blackbishop

Reputation: 32700

You can simply use filter like this:

import pyspark.sql.functions as F

df1 = df.filter(F.col("COUNTRY").isin(a) & F.col("CURRENCY").isin(b))

Or you can read the json file using spark then join with the other dataframe from csv:

df_json = spark.read.json("path_to_json")
df_csv = spark.read.csv("path_to_csv")

result = df_csv.join(
    df_json.select("COUNTRY", "CURRENCY"),
    on=["COUNTRY", "CURRENCY"],
    how="inner"
)

Upvotes: 1

Related Questions