Reputation: 3931
I'm running the following expression using a Pyspark dataframe
:
md = data.filter(data['cluster_id'].like('cluster30')) \
.select(
udf_make_date(
fn.year(data['request_timestamp']),
fn.month(data['request_timestamp']),
fn.dayofmonth(data['request_timestamp'])
),
who_assigned,
fn.hour(data['request_timestamp']).alias('request_hour'),
fn.date_format(
data['request_timestamp'],
'F').alias('request_day_of_week'),
fn.lit(data.count()).alias('num_requests'),
fn.countDistinct(data['user_id']).alias('num_users'),
fn.avg(data['microseconds']).alias(
'avg_response_time_microseconds')) \
.groupBy(
udf_make_date(
fn.year(data['request_timestamp']),
fn.month(data['request_timestamp']),
fn.dayofmonth(data['request_timestamp'])
),
who_assigned,
fn.hour(data['request_timestamp']),
fn.date_format(
data['request_timestamp'],
'F')
)
and am receiving the following error:
pyspark.sql.utils.AnalysisException: "expression '`request_timestamp`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;".
As far as I can tell, I should be including everything in the groupBy
that I need... I'm writing this to mirror the structure of my SQL
query, which looks roughly like this:
SELECT
MAKE_DATE(YEAR(request_timestamp), MONTH(request_timestamp), DAYOFMONTH(request_timestamp)),
CASE
lots of case logic here...
HOUR(request_timestamp) AS request_hour,
DATE_FORMAT(request_timestamp, 'F') request_day_of_week,
COUNT(*) as num_requests,
COUNT(DISTINCT user_id) num_users,
AVG(microseconds) AS avg_response_time_microseconds
FROM
(SELECT *
FROM {table}
WHERE cluster_id LIKE 'cluster30')
GROUP BY
MAKE_DATE(YEAR(request_timestamp), MONTH(request_timestamp), DAYOFMONTH(request_timestamp)),
CASE
lots of case logic here...
HOUR(request_timestamp),
DATE_FORMAT(request_timestamp, 'F')
Upvotes: 1
Views: 1989
Reputation: 17862
In Spark, the groupBy comes before the aggregations. Also, every column in the groupBy function is selected in the result DataFrame. For you query, the equivalent in SparkSQL API would be something like:
data \
.filter(data['cluster_id'].like('cluster30')) \
.groupBy(
udf_make_date(
fn.year(data['request_timestamp']),
fn.month(data['request_timestamp']),
fn.dayofmonth(data['request_timestamp'])
).alias('request_date'),
who_assigned,
fn.hour(data['request_timestamp']).alias('request_hour'),
fn.date_format(
data['request_timestamp'],
'F'
).alias('request_day_of_week')
) \
.agg(
fn.countDistinct(data['user_id']).alias('num_users'),
fn.count('*').alias('num_requests'),
fn.avg(data['microseconds']).alias('avg_response_time_microseconds')
)
Upvotes: 3