Durga
Durga

Reputation: 137

Convert Spark SQL to Dataframe API

I am new to Pyspark.I am looking to convert the below spark SQL to dataframe API

sql("SELECT
t.transaction_category_id,
sum(t.transaction_amount) AS sum_amount,
count(DISTINCT t.user_id) AS num_users
FROM transactions t
JOIN users u USING (user_id)
WHERE t.is_blocked = False
AND u.is_active = 1
GROUP BY t.transaction_category_id
ORDER BY sum_amount DESC").show()

The tables are uneven where the transactions tables is a large table.I am looking if I can apply broadcast join/salting?

Upvotes: 0

Views: 148

Answers (2)

Subash
Subash

Reputation: 895

You can also use the below

  import pyspark.sql.functions as func
  output_df = transactions.join(broadcast(users), users.user_id
                              == transactions.user_id).where((transactions.is_blocked
        == False) & (users.is_active
        == 1)).groupBy(transactions.transaction_category_id).agg(func.countDistinct(users.user_id).alias('num_users'
        ), func.sum(transactions.transaction_amount).alias('sum_amount'
        )).select(transactions.transaction_category_id, 'num_users',
                  'sum_amount')

Upvotes: 1

ARCrow
ARCrow

Reputation: 1858

The join part of the query would look like:

import pyspark.sql.functions as f
output_df = (
    transactions.alias('t')
    .join(users.alias('u').hint('broadcast'), ['user_id'], 'inner')
    .where((f.col('t.is_blocked') == False) & (f.col('u.is_active') == 1))
    .groupBy(f.col('t.transaction_category_id'))
    .agg(
        f.sum(f.col('t.transaction_amount')).alias('sum_amount'),
        f.count_distinct(f.col('t.user_id')).alias('num_users')
    )
    .orderBy(f.col('sum_amount'))
)

Upvotes: 1

Related Questions