Reputation: 122
I am using spark structured streaming with Kafka, however when I try to write stream to the console I get the error:
Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
Here is my code:
def group_obs(obs_df):
obs = obs_df.select(f.col("obs.payload.after").alias("obs"))
filtered_obs_with_value = obs \
.union(obs.filter("obs.value_datetime is not null")
.withColumn("value", f.col("obs.value_datetime"))
.withColumn("value_type", f.lit("datetime")))
grouped_by_obsgroup = filtered_obs_with_value\
.groupBy("obs.obs_group_id", "obs.encounter_id")
.agg(f.struct(f.col("obs.obs_group_id"),f.collect_list("tempObs").alias("obs")).alias("obs"))
query = grouped_by_obsgroup \
.writeStream \
.outputMode("update") \
.format("console") \
.start()
query.awaitTermination()
raw_obs = kafka_stream.select(from_json(col("value").cast("string"),mySchema)
transformed_obs = group_obs(raw_obs)
Upvotes: 3
Views: 2032
Reputation:
There is nothing specifically wrong with your code.
That's a known bug tracked by SPARK-23636. There is also a similar issue with direct DStream
tracked by SPARK-19185.
According to the JIRA ticket:
Only workaround is to start our applications with executor-cores = 1, with dynamic resource allocation enabled.
which may or may not be acceptable in your case.
Upvotes: 4