Zach
Zach

Reputation: 1351

Pandas on Spark Resample: "Rule Code Not Supported" & "TypeError: Type datetime64[us] was not understood"

I'm trying to do something straightforward in Pandas -- take some time series data and resample it to a minute. But I'm running into a variety of issues from Spark (I'm new to PySpark so be kind ;) )

I'm doing this via "Pandas on Spark" because pivot operations are not appreciated in Spark.

For example, I currently have something that looks like this:

# spark.version == 3.5.0-amzn-0

df = spark.sql('select * from stream_view;').pandas_api()
# type(df) == <class 'pyspark.pandas.frame.DataFrame'>

df.head() ==

timestamp                      A        B        C                                                                              
2024-05-03 15:25:27.820        0        0        3
2024-05-03 15:25:40.721        2        0        3
2024-05-03 15:26:17.812        1        1        2
2024-05-03 15:26:50.690        3        1        2
2024-05-03 15:27:17.807        3        0        1

And it needs to be transformed to this:

timestamp                  A        B        C                                                                              
2024-05-03 15:25:00        1        0        3
2024-05-03 15:26:00        2        1        2
2024-05-03 15:27:00        3        0        1

If I were doing this in vanilla pandas I'd simply do df.resample('1min').mean() and call it a day. According to the docs that should be possible in PySpark too, but it is not.

df.resample('min')  # also 1min

# throws this error:

An error was encountered:
rule code min is not supported
Traceback (most recent call last):
  File "/mnt1/yarn/usercache/livy/appcache/application_1714638802442_0031/container_1714638802442_0031_01_000001/pyspark.zip/pyspark/pandas/frame.py", line 13373, in resample
    return DataFrameResampler(
  File "/mnt1/yarn/usercache/livy/appcache/application_1714638802442_0031/container_1714638802442_0031_01_000001/pyspark.zip/pyspark/pandas/resample.py", line 736, in __init__
    super().__init__(
  File "/mnt1/yarn/usercache/livy/appcache/application_1714638802442_0031/container_1714638802442_0031_01_000001/pyspark.zip/pyspark/pandas/resample.py", line 107, in __init__
    raise ValueError("rule code {} is not supported".format(self._offset.rule_code))
ValueError: rule code min is not supported

I also tried different valid rule codes (s, h, etc), but none work (verified in the source code).

Because I originally misunderstood the error message and thought it was something to do with a floor operation, I tried doing that manually. This opened up new issues.

# type(df.index) == <class 'pyspark.pandas.indexes.datetimes.DatetimeIndex'>

df.index.floor('s')

# but then get this error:

An error was encountered:

  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/mnt1/yarn/usercache/livy/appcache/application_1714638802442_0031/container_1714638802442_0031_01_000001/pyspark.zip/pyspark/pandas/accessors.py", line 955, in pudf
  File "/mnt1/yarn/usercache/livy/appcache/application_1714638802442_0031/container_1714638802442_0031_01_000001/pyspark.zip/pyspark/pandas/groupby.py", line 2302, in rename_output
  File "/mnt/yarn/usercache/livy/appcache/application_1714638802442_0031/container_1714638802442_0031_01_000004/pyspark.zip/pyspark/pandas/internal.py", line 1629, in prepare_pandas_frame
    spark_type = infer_pd_series_spark_type(reset_index[col], dtype, prefer_timestamp_ntz)
  File "/mnt/yarn/usercache/livy/appcache/application_1714638802442_0031/container_1714638802442_0031_01_000004/pyspark.zip/pyspark/pandas/typedef/typehints.py", line 368, in infer_pd_series_spark_type
    return as_spark_type(dtype, prefer_timestamp_ntz=prefer_timestamp_ntz)
  File "/mnt/yarn/usercache/livy/appcache/application_1714638802442_0031/container_1714638802442_0031_01_000004/pyspark.zip/pyspark/pandas/typedef/typehints.py", line 248, in as_spark_type
    raise TypeError("Type %s was not understood." % tpe)
TypeError: Type datetime64[us] was not understood.

I tried doing this immediately after converting it from a Spark DF to a Pandas on Spark DF and got the same error, so it's not something that I'm doing to the index's type.

I also tried df.index.round() and that too had the same issue.

Is this API super buggy or am I doing something very wrong? Probably the latter considering how new I am to PySpark (hopefully). In any case, I would really appreciate the help because I've been fighting this all day.

Thanks in advance :)

Upvotes: 0

Views: 135

Answers (0)

Related Questions