Reputation: 137
I am new to Pyspark.I am looking to convert the below spark SQL to dataframe API
sql("SELECT
t.transaction_category_id,
sum(t.transaction_amount) AS sum_amount,
count(DISTINCT t.user_id) AS num_users
FROM transactions t
JOIN users u USING (user_id)
WHERE t.is_blocked = False
AND u.is_active = 1
GROUP BY t.transaction_category_id
ORDER BY sum_amount DESC").show()
The tables are uneven where the transactions tables is a large table.I am looking if I can apply broadcast join/salting?
Upvotes: 0
Views: 148
Reputation: 895
You can also use the below
import pyspark.sql.functions as func
output_df = transactions.join(broadcast(users), users.user_id
== transactions.user_id).where((transactions.is_blocked
== False) & (users.is_active
== 1)).groupBy(transactions.transaction_category_id).agg(func.countDistinct(users.user_id).alias('num_users'
), func.sum(transactions.transaction_amount).alias('sum_amount'
)).select(transactions.transaction_category_id, 'num_users',
'sum_amount')
Upvotes: 1
Reputation: 1858
The join part of the query would look like:
import pyspark.sql.functions as f
output_df = (
transactions.alias('t')
.join(users.alias('u').hint('broadcast'), ['user_id'], 'inner')
.where((f.col('t.is_blocked') == False) & (f.col('u.is_active') == 1))
.groupBy(f.col('t.transaction_category_id'))
.agg(
f.sum(f.col('t.transaction_amount')).alias('sum_amount'),
f.count_distinct(f.col('t.user_id')).alias('num_users')
)
.orderBy(f.col('sum_amount'))
)
Upvotes: 1