Reputation: 1351
I'm trying to do something straightforward in Pandas -- take some time series data and resample it to a minute. But I'm running into a variety of issues from Spark (I'm new to PySpark so be kind ;) )
I'm doing this via "Pandas on Spark" because pivot operations are not appreciated in Spark.
For example, I currently have something that looks like this:
# spark.version == 3.5.0-amzn-0
df = spark.sql('select * from stream_view;').pandas_api()
# type(df) == <class 'pyspark.pandas.frame.DataFrame'>
df.head() ==
timestamp A B C
2024-05-03 15:25:27.820 0 0 3
2024-05-03 15:25:40.721 2 0 3
2024-05-03 15:26:17.812 1 1 2
2024-05-03 15:26:50.690 3 1 2
2024-05-03 15:27:17.807 3 0 1
And it needs to be transformed to this:
timestamp A B C
2024-05-03 15:25:00 1 0 3
2024-05-03 15:26:00 2 1 2
2024-05-03 15:27:00 3 0 1
If I were doing this in vanilla pandas I'd simply do df.resample('1min').mean()
and call it a day. According to the docs that should be possible in PySpark too, but it is not.
df.resample('min') # also 1min
# throws this error:
An error was encountered:
rule code min is not supported
Traceback (most recent call last):
File "/mnt1/yarn/usercache/livy/appcache/application_1714638802442_0031/container_1714638802442_0031_01_000001/pyspark.zip/pyspark/pandas/frame.py", line 13373, in resample
return DataFrameResampler(
File "/mnt1/yarn/usercache/livy/appcache/application_1714638802442_0031/container_1714638802442_0031_01_000001/pyspark.zip/pyspark/pandas/resample.py", line 736, in __init__
super().__init__(
File "/mnt1/yarn/usercache/livy/appcache/application_1714638802442_0031/container_1714638802442_0031_01_000001/pyspark.zip/pyspark/pandas/resample.py", line 107, in __init__
raise ValueError("rule code {} is not supported".format(self._offset.rule_code))
ValueError: rule code min is not supported
I also tried different valid rule codes (s
, h
, etc), but none work (verified in the source code).
Because I originally misunderstood the error message and thought it was something to do with a floor operation, I tried doing that manually. This opened up new issues.
# type(df.index) == <class 'pyspark.pandas.indexes.datetimes.DatetimeIndex'>
df.index.floor('s')
# but then get this error:
An error was encountered:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "/mnt1/yarn/usercache/livy/appcache/application_1714638802442_0031/container_1714638802442_0031_01_000001/pyspark.zip/pyspark/pandas/accessors.py", line 955, in pudf
File "/mnt1/yarn/usercache/livy/appcache/application_1714638802442_0031/container_1714638802442_0031_01_000001/pyspark.zip/pyspark/pandas/groupby.py", line 2302, in rename_output
File "/mnt/yarn/usercache/livy/appcache/application_1714638802442_0031/container_1714638802442_0031_01_000004/pyspark.zip/pyspark/pandas/internal.py", line 1629, in prepare_pandas_frame
spark_type = infer_pd_series_spark_type(reset_index[col], dtype, prefer_timestamp_ntz)
File "/mnt/yarn/usercache/livy/appcache/application_1714638802442_0031/container_1714638802442_0031_01_000004/pyspark.zip/pyspark/pandas/typedef/typehints.py", line 368, in infer_pd_series_spark_type
return as_spark_type(dtype, prefer_timestamp_ntz=prefer_timestamp_ntz)
File "/mnt/yarn/usercache/livy/appcache/application_1714638802442_0031/container_1714638802442_0031_01_000004/pyspark.zip/pyspark/pandas/typedef/typehints.py", line 248, in as_spark_type
raise TypeError("Type %s was not understood." % tpe)
TypeError: Type datetime64[us] was not understood.
I tried doing this immediately after converting it from a Spark DF to a Pandas on Spark DF and got the same error, so it's not something that I'm doing to the index's type.
I also tried df.index.round()
and that too had the same issue.
Is this API super buggy or am I doing something very wrong? Probably the latter considering how new I am to PySpark (hopefully). In any case, I would really appreciate the help because I've been fighting this all day.
Thanks in advance :)
Upvotes: 0
Views: 135