theMadKing
theMadKing

Reputation: 2074

pySpark DataFrames .groupBy() insert a string of values

I have code that looks like this:

print df.groupBy('offer_id', 'record_id').avg().collect()

Which works but I have a string that has:

print df.groupBy(stringNamesDF).avg().collect()

Which fails with:

org.apache.spark.sql.AnalysisException: cannot resolve ''record_id', 'assigned_offer_id', 'accepted_offer_flag', 'current_offer_flag', 'offer_good_until_date', 'rescinded_date', 'first_pymt_date', 'contract_date', 'acct_nbr', 'acct_nbr_assigned_dttm', 'acct_expiration_dttm', 'offer_desc', 'offer_sales_script', 'presentable_flag', 'insrt_dttm', 'insrt_usr_id', 'chng_dttm', 'chng_usr_id', 'actv_flag', 'correlation_id', 'offer_status_type_cd', 'presentation_instrument_nbr'' given input columns pymt, rescindable_days, rescinded_date, market_cell_id, offer_sales_script, assigned_offer_id, offer_desc, rate_index_type_cd, nbr_of_pymts, campaign_id, down_pymt, offer_status_type_cd, offer_type_cd, acct_expiration_dttm, record_id, origination_fee_rate, insrt_usr_id, promo_id, term_mm, min_amount, offer_good_until_date, decision_id, insrt_dttm, late_fee_min_amount, late_fee_percent, offer_id, origination_fee_amount, presentation_instrument_nbr, offer_order, chng_usr_id, correlation_id, acct_nbr_assigned_dttm, chng_dttm, presentable_flag, accepted_offer_flag, amount, min_rate, max_rate, acct_nbr, actv_flag, sub_product_id, cs_result_id, current_offer_flag, finance_charge, annual_fee_waived_mm, cs_result_usage_type_cd, max_amount, total_pymts, contract_date, index_rate, first_pymt_date, annual_fee_amount, rate, amount_financed, pymt_method_type_cd;

stringNamesDF print looks like:

'record_id', 'assigned_offer_id', 'accepted_offer_flag', 'current_offer_flag', 'offer_good_until_date', 'rescinded_date', 'first_pymt_date', 'contract_date', 'acct_nbr', 'acct_nbr_assigned_dttm', 'acct_expiration_dttm', 'offer_desc', 'offer_sales_script', 'presentable_flag', 'insrt_dttm', 'insrt_usr_id', 'chng_dttm', 'chng_usr_id', 'actv_flag', 'correlation_id', 'offer_status_type_cd', 'presentation_instrument_nbr'

I've also tried with stringNamesDF looking like this:

record_id, assigned_offer_id, accepted_offer_flag, current_offer_flag, offer_good_until_date, rescinded_date, first_pymt_date, contract_date, acct_nbr, acct_nbr_assigned_dttm, acct_expiration_dttm, offer_desc, offer_sales_script, presentable_flag, insrt_dttm, insrt_usr_id, chng_dttm, chng_usr_id, actv_flag, correlation_id, offer_status_type_cd, presentation_instrument_nbr

but get this:

org.apache.spark.sql.AnalysisException: cannot resolve 'record_id, assigned_offer_id, accepted_offer_flag, current_offer_flag, offer_good_until_date, rescinded_date, first_pymt_date, contract_date, acct_nbr, acct_nbr_assigned_dttm, acct_expiration_dttm, offer_desc, offer_sales_script, presentable_flag, insrt_dttm, insrt_usr_id, chng_dttm, chng_usr_id, actv_flag, correlation_id, offer_status_type_cd, presentation_instrument_nbr' given input columns pymt, rescindable_days, rescinded_date, market_cell_id, offer_sales_script, assigned_offer_id, offer_desc, rate_index_type_cd, nbr_of_pymts, campaign_id, down_pymt, offer_status_type_cd, offer_type_cd, acct_expiration_dttm, record_id, origination_fee_rate, insrt_usr_id, promo_id, term_mm, min_amount, offer_good_until_date, decision_id, insrt_dttm, late_fee_min_amount, late_fee_percent, offer_id, origination_fee_amount, presentation_instrument_nbr, offer_order, chng_usr_id, correlation_id, acct_nbr_assigned_dttm, chng_dttm, presentable_flag, accepted_offer_flag, amount, min_rate, max_rate, acct_nbr, actv_flag, sub_product_id, cs_result_id, current_offer_flag, finance_charge, annual_fee_waived_mm, cs_result_usage_type_cd, max_amount, total_pymts, contract_date, index_rate, first_pymt_date, annual_fee_amount, rate, amount_financed, pymt_method_type_cd;

Edit: I've tried with: stringNames[] which looks like with no success:

['record_id', 'assigned_offer_id', 'accepted_offer_flag', 'current_offer_flag', 'offer_good_until_date', 'rescinded_date', 'first_pymt_date', 'contract_date', 'acct_nbr', 'acct_nbr_assigned_dttm', 'acct_expiration_dttm', 'offer_desc', 'offer_sales_script', 'presentable_flag', 'insrt_dttm', 'insrt_usr_id', 'chng_dttm', 'chng_usr_id', 'actv_flag', 'correlation_id', 'offer_status_type_cd', 'presentation_instrument_nbr']

and get AttributeError: 'list' object has no attribute '_get_object_id'

Upvotes: 0

Views: 3211

Answers (2)

kruhly
kruhly

Reputation: 1

When stringNamesDF is a list:

stringNamesDF=['record_id', 'assigned_offer_id', 'accepted_offer_flag', 'current_offer_flag', 'offer_good_until_date', 'rescinded_date', 'first_pymt_date', 'contract_date', 'acct_nbr', 'acct_nbr_assigned_dttm', 'acct_expiration_dttm', 'offer_desc', 'offer_sales_script', 'presentable_flag', 'insrt_dttm', 'insrt_usr_id', 'chng_dttm', 'chng_usr_id', 'actv_flag', 'correlation_id', 'offer_status_type_cd', 'presentation_instrument_nbr']  

Use:

df.groupBy(*stringNamesDF).avg().collect()

from https://spark.apache.org/docs/latest/api/python/pyspark.sql.html

groupBy(*cols)

Groups the DataFrame using the specified columns, so we can run aggregation on them.

Parameters: cols – list of columns to group by. Each element should be a column name (string) or an expression (Column).

Example:

l = [('Alice','2015-02-02', 1),('Alice','2015-02-02', 2), ('Alice','2015-02-03', 1), ('Bob','2015-02-03', 1), ('Bob','2015-02-03', 3)]

ddf = sqlContext.createDataFrame(l, ['name', 'date','clicks'])

ddf.groupBy(*['name','date']).avg().collect()

Upvotes: -1

David Griffin
David Griffin

Reputation: 13927

Try:

print df.groupBy(stringNamesDF.split(", ")).avg().collect()

Upvotes: 0

Related Questions