Reputation: 757
I'm running into a issue reading GCP PubSub from Dataflow where when publish large number of messages in short period of time, Dataflow will receive most of the sent messages, except some messages will be lost, and some other messages would be duplicated. And the most weird part is that the number of lost messages will be exactly the same as the number of messages being duplicated.
In one of the examples, I send 4,000 messages in 5 sec, and in total 4,000 messages were received, but 9 messages were lost, and exactly 9 messages were duplicated.
The way I determine the duplicates is via logging. I'm logging every message that is published to Pubsub along with the message id generated by pubsub. I'm also logging the message right after reading from PubsubIO in a Pardo transformation.
The way I read from Pubsub in Dataflow is using org.apache.beam.sdk.ioPubsubIO
:
public interface Options extends GcpOptions, DataflowPipelineOptions {
// PUBSUB URL
@Description("Pubsub URL")
@Default.String("https://pubsub.googleapis.com")
String getPubsubRootUrl();
void setPubsubRootUrl(String value);
// TOPIC
@Description("Topic")
@Default.String("projects/test-project/topics/test_topic")
String getTopic();
void setTopic(String value);
...
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
options.setRunner(DataflowRunner.class);
...
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(PubsubIO
.<String>read()
.topic(options.getTopic())
.withCoder(StringUtf8Coder.of())
)
.apply("Logging data coming out of Pubsub", ParDo
.of(some_logging_transformation)
)
.apply("Saving data into db", ParDo
.of(some_output_transformation)
)
;
pipeline.run().waitUntilFinish();
}
I wonder if this is a known issue in Pubsub or PubsubIO?
UPDATE: tried 4000 request with pubsub emulator, no missing data and no duplicates
UPDATE #2:
I went through some more experiments and found that the duplicating messages are taking the message_id
from the missing ones. Because the direction of the issue has been diverted from it's origin quite a bit, I decide to post another question with detailed logs as well as the code I used to publish and receive messages.
link to the new question: Google Cloud Pubsub Data lost
Upvotes: 2
Views: 3273
Reputation: 757
I talked with a Google guy from the PubSub team. It seems to be caused by a thread-safety issue with the Python client. Please refer to the accepted answer for Google Cloud Pubsub Data lost for the response from Google
Upvotes: 2