fatmali
fatmali

Reputation: 122

KafkaConsumer is not safe for multi-threaded access pyspark

I am using spark structured streaming with Kafka, however when I try to write stream to the console I get the error:

Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

Here is my code:

def group_obs(obs_df):
        obs = obs_df.select(f.col("obs.payload.after").alias("obs"))

        filtered_obs_with_value = obs \
            .union(obs.filter("obs.value_datetime is not null")
                   .withColumn("value", f.col("obs.value_datetime"))
                   .withColumn("value_type", f.lit("datetime")))


        grouped_by_obsgroup = filtered_obs_with_value\
                             .groupBy("obs.obs_group_id", "obs.encounter_id")
                             .agg(f.struct(f.col("obs.obs_group_id"),f.collect_list("tempObs").alias("obs")).alias("obs"))

        query = grouped_by_obsgroup \
                .writeStream \
                .outputMode("update") \
                .format("console") \
                .start()

        query.awaitTermination()

raw_obs = kafka_stream.select(from_json(col("value").cast("string"),mySchema)
transformed_obs = group_obs(raw_obs)

Upvotes: 3

Views: 2032

Answers (1)

user9714305
user9714305

Reputation:

There is nothing specifically wrong with your code.

That's a known bug tracked by SPARK-23636. There is also a similar issue with direct DStream tracked by SPARK-19185.

According to the JIRA ticket:

Only workaround is to start our applications with executor-cores = 1, with dynamic resource allocation enabled.

which may or may not be acceptable in your case.

Upvotes: 4

Related Questions