mattjoe182
mattjoe182

Reputation: 21

Multiple writeStreams in Spark Structured Streaming (Pyspark)

I have been successful in implementing a single writeStream in Pyspark - but once I add a second writeStream, only the first gets printed to the console. Here is my code:

import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()
conf.set("spark.scheduler.allocation.file", "file:///opt/spark/conf/fairscheduler.xml")

spark = SparkSession \
    .builder \
    .appName("SparkStreaming") \
    .config(conf=conf) \
    .getOrCreate()

spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")

schema = StructType([ 
    StructField("text", StringType(), True),
    StructField("created_at" , TimestampType(), True)
    ])

tweets_df1 = spark \
    .readStream \
    .format("socket") \
    .option("host", "127.0.0.1") \
    .option("port", 9999) \
    .load() \
    .select(F.from_json(F.col("value").cast("string"), schema).alias("tmp")).select("tmp.*")

q1 = tweets_df1 \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .option("checkpointLocation", "/home/ubuntu/apache-spark-streaming-twitter-1/chk1") \
    .trigger(processingTime='5 seconds') \
    .start()

q2 = tweets_df1 \
    .withColumn("foo", F.lit("foo")) \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .option("checkpointLocation", "/home/ubuntu/apache-spark-streaming-twitter-1/chk2") \
    .trigger(processingTime='5 seconds') \
    .start()

spark.streams.awaitAnyTermination()

And here is my output:

-------------------------------------------
Batch: 0
-------------------------------------------
-------------------------------------------
Batch: 0
-------------------------------------------
+----+----------+
|text|created_at|
+----+----------+
+----+----------+

+----+----------+---+
|text|created_at|foo|
+----+----------+---+
+----+----------+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+-------------------+
|                text|         created_at|
+--------------------+-------------------+
|Qatar posting for...|2022-12-16 20:23:06|
+--------------------+-------------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+-------------------+
|                text|         created_at|
+--------------------+-------------------+
|Who will win this...|2022-12-16 20:23:13|
+--------------------+-------------------+

The dataframes with column foo stop after batch 0 - meaning that second writeStream is not running. I can confirm this with the checkpoint folder for each writeStream. Most of the solutions to this problem are in Scala and I have tried to translate them to Pyspark.

Is this just something that is not possible in Pyspark?

Upvotes: 2

Views: 1104

Answers (1)

Alex Ott
Alex Ott

Reputation: 87359

Most probably this happens because you can consume from socket only one time, so one of the streams is "winning". If you want to have multiple consumers, consider to put your messages into something "durable", for example, into Kafka - then each stream would be able to consume messages independently of each other.

Upvotes: 1

Related Questions