Zac
Zac

Reputation: 21

PySpark error: Append output mode not supported when there are streaming aggregations on streaming DataFrames without watermark

I have a kafka topic streaming some user data with each message of the following format:

message1 = {'username': 'user1', 'account_balance': 25.0}
message2 = {'username': 'user2', 'account_balance': 2.0}
message3 = {'username': 'user2', 'account_balance': 4.0}
message4 = {'username': 'user1', 'account_balance': 27.0}

I am reading this data using pyspark structured streaming and I want to process this data to give average account_balance as of now. So, the expected aggregated dataframe after recieving each of the above messages is

agg_df1 = {'username': 'user1', 'account_balance': 25.0}
agg_df2 = {'username': 'user1', 'account_balance': 25.0}, {'username': 'user2', 'account_balance': 2.0}
agg_df3 = {'username': 'user1', 'account_balance': 25.0}, {'username': 'user2', 'account_balance': 3.0}
agg_df4 = {'username': 'user1', 'account_balance': 26.0}, {'username': 'user2', 'account_balance': 2.0}

After this I want to append each agg_df into a cassandra table with a timestamp for each message. I have the following pyspark script to achieve the above:

# parsed_df is the parsed the data from kafka topic message
# parsed_df = DataFrame[username: string, account_balance: float]

summary_df = parsed_df.groupBy("username") \
.agg(avg("account_balance").alias("account_balance_avg")) \
.withColumn("uuid", make_uuid()) \
.withColumn("ingest_timestamp", current_timestamp())

summary_df.writeStream.outputMode("append").format("org.apache.spark.sql.cassandra") \
  .option("checkpointLocation", "/tmp/check_point/") \
  .options(table="avg_account_balance", keyspace="user") \
  .start()

However, the last statement throws the following error:

Traceback (most recent call last):
  File "<stdin>", line 4, in <module>
  File "/opt/spark/python/pyspark/sql/streaming/readwriter.py", line 1385, in start
    return self._sq(self._jwrite.start())
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/opt/spark/python/pyspark/errors/exceptions/captured.py", line 175, in deco
    raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;

The problem is I don't want to use watermark as I want the average from the beginning.

I tried using outputMode("complete") while writing to cassandra but it throws another error:

java.lang.UnsupportedOperationException: You are attempting to use overwrite mode which will truncate
this table prior to inserting data. If you would merely like
to change data already in the table use the "Append" mode.
To actually truncate please pass in true value to the option
"confirm.truncate" or set that value to true in the session.conf
 when saving.

However, using outputMode("complete") works when I write to console as follows:

parsed_df.groupBy("username") \
.agg(avg("account_balance").alias("account_balance_avg")) \
.withColumn("uuid", make_uuid()) \
.withColumn("ingest_timestamp", current_timestamp()) \
.writeStream.outputMode("complete").format("console").start()

Upvotes: 1

Views: 376

Answers (0)

Related Questions