Reputation: 21
I have been successful in implementing a single writeStream
in Pyspark - but once I add a second writeStream
, only the first gets printed to the console. Here is my code:
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
conf = SparkConf()
conf.set("spark.scheduler.allocation.file", "file:///opt/spark/conf/fairscheduler.xml")
spark = SparkSession \
.builder \
.appName("SparkStreaming") \
.config(conf=conf) \
.getOrCreate()
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
schema = StructType([
StructField("text", StringType(), True),
StructField("created_at" , TimestampType(), True)
])
tweets_df1 = spark \
.readStream \
.format("socket") \
.option("host", "127.0.0.1") \
.option("port", 9999) \
.load() \
.select(F.from_json(F.col("value").cast("string"), schema).alias("tmp")).select("tmp.*")
q1 = tweets_df1 \
.writeStream \
.outputMode("append") \
.format("console") \
.option("checkpointLocation", "/home/ubuntu/apache-spark-streaming-twitter-1/chk1") \
.trigger(processingTime='5 seconds') \
.start()
q2 = tweets_df1 \
.withColumn("foo", F.lit("foo")) \
.writeStream \
.outputMode("append") \
.format("console") \
.option("checkpointLocation", "/home/ubuntu/apache-spark-streaming-twitter-1/chk2") \
.trigger(processingTime='5 seconds') \
.start()
spark.streams.awaitAnyTermination()
And here is my output:
-------------------------------------------
Batch: 0
-------------------------------------------
-------------------------------------------
Batch: 0
-------------------------------------------
+----+----------+
|text|created_at|
+----+----------+
+----+----------+
+----+----------+---+
|text|created_at|foo|
+----+----------+---+
+----+----------+---+
-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+-------------------+
| text| created_at|
+--------------------+-------------------+
|Qatar posting for...|2022-12-16 20:23:06|
+--------------------+-------------------+
-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+-------------------+
| text| created_at|
+--------------------+-------------------+
|Who will win this...|2022-12-16 20:23:13|
+--------------------+-------------------+
The dataframes with column foo
stop after batch 0 - meaning that second writeStream
is not running. I can confirm this with the checkpoint folder for each writeStream
. Most of the solutions to this problem are in Scala and I have tried to translate them to Pyspark.
Is this just something that is not possible in Pyspark?
Upvotes: 2
Views: 1104
Reputation: 87359
Most probably this happens because you can consume from socket only one time, so one of the streams is "winning". If you want to have multiple consumers, consider to put your messages into something "durable", for example, into Kafka - then each stream would be able to consume messages independently of each other.
Upvotes: 1