Reputation: 25
I need sums of respondents who answered a given choice for each choice in multiple choice test. I have data of the following format with, say 3 people and 100 questions:
+---------+---------+ --------------+--------------+ --------------+--------------+
| misc_1 | misc_2 ... Answer_A_1 | Answer_A_2 ... Answer_D_99 | Answer_D_100 |
+---------+---------+ --------------+--------------+ --------------+--------------+
| James| 2345 ... 0 1 ... 0 | 1 |
| Anna| 5434 ... 1 0 ... 0 | 1 |
| Robert| 7890 ... 0 1 ... 1 | 0 |
+---------+---------+ --------------+--------------+ --------------+--------------+
And I would like to get the sums of each answer choice selected in a dataframe to this effect:
+---+---+---+---+----------+
| A | B | C | D | Question
+---+---+---+---+----------+
| 1 | 0 | 1 | 1 | 1 |
| 2 | 1 | 0 | 1 | 2 |
| 0 | 3 | 0 | 0 | 3 |
: : : : :
: : : : :
| 1 | 0 | 0 | 2 | 100 |
+---+---+---+---+----------+
I tried the following:
from pyspark.sql import SparkSession, functions as F
def getSums(df):
choices = ['A', 'B', 'C', 'D']
arg = {}
answers = [column for column in df.columns if column.startswith("Ans")]
for a in answers:
arg[a] = 'sum'
sums = sums.select(*(F.col(i).alias(i.replace("(",'_').replace(')','')) for i in sums.columns))
sums = df.agg(arg).withColumn('idx', F.lit(None))
s = [f",'{l}'"+f",{column}" for column in sums.columns for l in choices if f"_{l}_" in column]
unpivotExpr = "stack(4"+''.join(map(str,s))+") as (A,B,C,D)"
unpivotDF = sums.select('idx',F.expr(unpivotExpr))
result = unpivotDF
return result
I changed the names assuming the parentheses from .agg()
would cause syntax error
The error is at unpivotDF = sums.select('idx',F.expr(unpivotExpr))
. I misunderstood how the stack()
function worked and assumed it would pivot the columns listed and rename them whatever was in the parentheses.
I get the following error:
AnalysisException: The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF expected 200 aliases but got A,B,C,D
Any alternative approaches or solutions without pyspark.pandas would be greatly appreciated.
Upvotes: 0
Views: 38
Reputation: 4108
The logic is:
Note - Change input parameters as per your requirement.
# ACTION: Change input parameters
total_ques = 3
que_cats = ["A", "B", "C", "D"]
import pyspark.sql.functions as F
# Sum columns
result_df = df.select([F.sum(x).alias(x) for x in df.columns if x not in ["misc.1", "misc.2"]])
# Collect scores as array: "A" = ["A1", "A2", "A3"]. Repeat to "B", "C" & "D".
for c in que_cats:
col_list = [x for x in result_df.columns if f"Answer_{c}_" in x]
result_df = result_df.withColumn(c, F.array(col_list))
result_df = result_df.select(que_cats)
result_df = result_df.withColumn("Question", F.array([F.lit(i) for i in range(1,total_ques+1)]))
# Zip as [{"A1", "B1", "C1", "D1"}, {"A2", "B2", "C2", "D2"}, ...]
final_cols = result_df.columns
result_df = result_df.select(F.arrays_zip(*final_cols).alias("zipped"))
# Explode to separate rows for each question
result_df = result_df.select(F.explode("zipped").alias("zipped"))
# Split by fields "A", "B", "C", "D"
for c in final_cols:
result_df = result_df.withColumn(c, result_df.zipped.getField(c))
result_df = result_df.select(final_cols)
Output:
+---+---+---+---+--------+
|A |B |C |D |Question|
+---+---+---+---+--------+
|0 |3 |0 |3 |1 |
|3 |0 |3 |0 |2 |
|3 |0 |3 |3 |3 |
+---+---+---+---+--------+
Sample dataset used:
df = spark.createDataFrame(data=[
["James",0,1,1,1,0,0,0,1,1,1,0,1],
["Anna",0,1,1,1,0,0,0,1,1,1,0,1],
["Robert",0,1,1,1,0,0,0,1,1,1,0,1],
], schema=["misc.1","Answer_A_1","Answer_A_2","Answer_A_3","Answer_B_1","Answer_B_2","Answer_B_3","Answer_C_1","Answer_C_2","Answer_C_3","Answer_D_1","Answer_D_2","Answer_D_3"])
Upvotes: 1