pdangelo4
pdangelo4

Reputation: 230

pyspark list iterate to variable

How can I create a function that uses a list of strings to iterate the following. Intention is a, b, and c represent tables the users upload. Goal is to programmatically iterate no matter how many tables the users upload. I'm just looking to pull the counts of new rows broken out by table.

mylist = df.select('S_ID').distinct().rdd.flatMap(lambda x: x).collect()

mylist
>> ['a', 'b', 'c']

##Count new rows by S_ID type
a = df.filter(df.S_ID == 'a').count()
b = df.filter(df.S_ID == 'b').count()
c = df.filter(df.S_ID == 'c').count()

##Count current rows from Snowflake
a_current = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query", "select R_ID FROM mytable WHERE S_ID = 'a'").load()
a_current = a_current.count()

b_current = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query", "select R_ID FROM mytable WHERE S_ID = 'b'").load()
b_current = b_current.count()

c_current = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query", "select R_ID FROM mytable WHERE S_ID = 'c'").load()
c_current = c_current.count()

##Calculate count of new rows
a_new = a - a_current
a_new = str(a_new)

b_new = b - b_current
b_new = str(b_new)

c_new = c - c_current
c_new = str(c_new)

Something like this:

new_counts_list = []
for i in mylist:
    i = df.filter(df.S_ID == 'i').count()
    
    i_current = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query", "select R_ID FROM mytable WHERE S_ID = 'i'").load()
    i_current = i_current.count()
    
    i_new = i - i_current
    i_new = str(i_new)

    new_counts_list.append(i)

I'm stuck on keeping the {names : new_counts}

Upvotes: 2

Views: 1974

Answers (1)

ggordon
ggordon

Reputation: 10035

As it pertains to:

I'm stuck on keeping the {names : new_counts}

,at the end of your for loop you may use

new_counts_list[i]=i_new

instead of

new_counts_list.append(i)

assuming that you change how new_counts_list is initialized. i.e. initialized as a dict (new_counts_list={}) instead of a list.

You also seem to be hardcoding the literal value 'i' which is a string instead of using the variable i (i.e.) without the quotes in your proposed solution. Your updated solution may look like

new_counts_list={}
for i in mylist:
    i = df.filter(df.S_ID == i).count()
    
    i_current = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query", "select R_ID FROM mytable WHERE S_ID = '{0}'".format(i)).load()
    i_current = i_current.count()
    
    i_new = i - i_current
    i_new = str(i_new)

    new_counts_list[i]=i_new

On another note, while your approach, i.e. sequentially looping through each S_ID in my list and running the operations i.e.

  1. Running the action collect to pull all the S_ID to your driver node from your initial dataframe df into a list mylist
  2. Separately counting the number of occurrences of S_ID in your initial dataframe then executing another potentially expensive (IO reads/network communication/shuffles) collect()
  3. Creating another dataframe with spark.read.format(SNOWFLAKE_SOURCE_NAME) that will load all records filtered by each S_ID into memory before executing a count
  4. Finding the difference between the initial dataframe and the snowflake source

will work, it is expensive in IO reads and based on your cluster/setup, potentially expensive in network communication and shuffles.

You may consider using a groupby to reduce the amount of times you execute the potentially expensive collect. Furthermore, you may also join the initial dataframe to your snowflake source and let spark optimize your operations as a lazy execution plan distributed across your cluster/setup. Moreover, similar to how you are using the pushdown filter for the snowflake source, you may combine all selected S_ID in that query to allow snowflake to reduce all the desired results in one read. You would not need a loop. This could potentially look like:

Approach 1

In this approach, I will provide a pure spark solution to achieving your desired results

from pyspark.sql import functions as F
# Ask spark to select only the `S_ID` and group the data but not execute the transformation
my_exiting_counts_df = df.select('S_ID').groupBy('S_ID').count()

# Ask spark to select only the `S_ID` counts from the snowflake source
current_counts_df = (
    spark.read
         .format(SNOWFLAKE_SOURCE_NAME)
         .options(**sfOptions)
         .option("query", "select R_ID, COUNT(1) as cnt FROM mytable GROUP BY R_ID")
)

# Join both datasets which will filter to only selected `S_ID`
# and determine the differences between the existing and current counts
results_df = (
    my_exiting_counts_df.alias("existing")
                        .join(
                            current_counts_df.alias("current"),
                            F.col("S_ID")=F.col("R_ID"),
                            "inner"
                        )
                        .selectExpr(
                            "S_ID",
                            "count - cnt as actual_count"
                        )
)

# Execute the above transformations with `collect` and 
# Convert the dictionary values in the list above to your desired final dictionary
new_counts = {}
for row in results_df.collect():
    new_counts[row['S_ID']]=row['actual_count']

# your desired results are in `new_counts`

Approach 2

In this approach, I will collect the results of the group by and then use this to optimize the push down queries to the snowflake schema to return the desired results.

my_list_counts = df.select('S_ID').groupBy('S_ID').count()
selected_sids = []
case_expression = ""
for row in my_list_counts:
    selected_sids.append(row['S_ID'])
    case_expression = case_expression + " CASE WHEN R_ID='{0}' THEN {0} ".format(
        row['S_ID'],
        row['count']
    )

# The above has a table with columns `S_ID` and `count` where the 
# latter is the number of occurrences of `S_ID` in the dataset `df`

snowflake_push_down_query="""
SELECT
    R_ID AS S_ID
    ((CASE
          {0}
      END) - cnt) as actual_count
FROM (
    SELECT
        R_ID,
        COUNT(1) AS cnt
    FROM 
        mytable 
    WHERE
        R_ID IN ('{1}')
    GROUP BY 
        R_ID
) t
""".format(
   case_expression,
    "','".join(selected_sids)
)

results_df = (
    spark.read
         .format(SNOWFLAKE_SOURCE_NAME)
         .options(**sfOptions)
         .option("query", snowflake_push_down_query)
)

# Execute the above transformations with `collect` and 
# Convert the dictionary values in the list above to your desired final dictionary
new_counts = {}
for row in results_df.collect():
    new_counts[row['S_ID']]=row['actual_count']

# your desired results are in `new_counts`

Let me know if this works for you.

Upvotes: 1

Related Questions