Reputation:
I have a problem, I try to receive the messages from pubsublite in real time from a spark cluster on GCP, but they are grouped in blocks of one minute.
My code:
producer.py
import random
import time
from proj_BOLSA import settings
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
MessageMetadata,
TopicPath,
)
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/home/vm-sergiolr-development/Desktop/projecteemo_code/proj_BOLSA/credentials/gcp_authentication.json"
regional = True
if regional:
location = CloudRegion(settings.REGION)
else:
location = CloudZone(CloudRegion(settings.REGION), settings.ZONE)
topic_path = TopicPath(settings.PROJECT_NUMBER, location, settings.TOPIC)
# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
for i in range(6000):
data = "number: "+str(random.randint(0, 300))
api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
# result() blocks. To resolve API futures asynchronously, use add_done_callback().
message_id = api_future.result()
message_metadata = MessageMetadata.decode(message_id)
print(
f"Published {data} to {topic_path} with partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
)
time.sleep(20)
consumer.py
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import StringType
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from pyspark.sql.functions import from_json, col
# TODO(developer):
project_number = xxxxxxxx
location = "europe-west1"
subscription_id = "s_producer"
spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()
sdf = (
spark.readStream.format("pubsublite")
.option(
"pubsublite.subscription",
f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
)
.option("rowsPerSecond", 1).load()
)
sdf = sdf.withColumn("data", sdf.data.cast(StringType()))
query = (
sdf.writeStream.format("console")
.outputMode("append")
.trigger(processingTime="1 second")
.option("truncate", False)
.start()
)
# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination()
query.stop()
results
-------------------------------------------
Batch: 1
-------------------------------------------
+---------------------------------------------------------------------+---------+------+---+-----------+--------------------------+---------------+----------+
|subscription |partition|offset|key|data |publish_timestamp |event_timestamp|attributes|
+---------------------------------------------------------------------+---------+------+---+-----------+--------------------------+---------------+----------+
|projects/658599344059/locations/europe-west1/subscriptions/s_producer|0 |5942 |[] |number: 74 |2022-08-05 08:42:47.796738|null |{} |
|projects/658599344059/locations/europe-west1/subscriptions/s_producer|0 |5943 |[] |number: 288|2022-08-05 08:43:07.849063|null |{} |
|projects/658599344059/locations/europe-west1/subscriptions/s_producer|0 |5944 |[] |number: 156|2022-08-05 08:43:27.952513|null |{} |
+---------------------------------------------------------------------+---------+------+---+-----------+--------------------------+---------------+----------+
-------------------------------------------
Batch: 2
-------------------------------------------
+---------------------------------------------------------------------+---------+------+---+-----------+--------------------------+---------------+----------+
|subscription |partition|offset|key|data |publish_timestamp |event_timestamp|attributes|
+---------------------------------------------------------------------+---------+------+---+-----------+--------------------------+---------------+----------+
|projects/658599344059/locations/europe-west1/subscriptions/s_producer|0 |5945 |[] |number: 162|2022-08-05 08:43:48.00867 |null |{} |
|projects/658599344059/locations/europe-west1/subscriptions/s_producer|0 |5946 |[] |number: 262|2022-08-05 08:44:08.062032|null |{} |
|projects/658599344059/locations/europe-west1/subscriptions/s_producer|0 |5947 |[] |number: 59 |2022-08-05 08:44:28.11492 |null |{} |
+---------------------------------------------------------------------+---------+------+---+-----------+--------------------------+---------------+----------+
-------------------------------------------
Batch: 3
-------------------------------------------
+---------------------------------------------------------------------+---------+------+---+-----------+--------------------------+---------------+----------+
|subscription |partition|offset|key|data |publish_timestamp |event_timestamp|attributes|
+---------------------------------------------------------------------+---------+------+---+-----------+--------------------------+---------------+----------+
|projects/658599344059/locations/europe-west1/subscriptions/s_producer|0 |5948 |[] |number: 54 |2022-08-05 08:44:48.168997|null |{} |
|projects/658599344059/locations/europe-west1/subscriptions/s_producer|0 |5949 |[] |number: 206|2022-08-05 08:45:08.225344|null |{} |
|projects/658599344059/locations/europe-west1/subscriptions/s_producer|0 |5950 |[] |number: 109|2022-08-05 08:45:28.328074|null |{} |
+---------------------------------------------------------------------+---------+------+---+-----------+--------------------------+---------------+----------+
what is the error that I am committing to not be able to read the messages as they are coming to me instead of grouping them in batches of 1 minute.
Thank you!
Upvotes: 0
Views: 224
Reputation: 812
You are using the micro batch streaming mode where the spark runtime decides how many messages to read from the source at a time. It's actually reading ~30s windows, not 1 minute windows of data.
To read smaller time windows for small amounts of data, you would need to use the experimental continuous processing mode https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-processing
Upvotes: 0