Reputation: 340
I have a simple job (Apache Beam SDK for Java 2.2.0) that read messages from PubSub subscription, read configs from side input, apply transformations to messages and send results to another PubSub topic
The issue is that the number of outgoing messages is not equal to the number of incoming messages. I'm inserting 15 millions messages very quickly from another job (without manually specifying a timestamp). Issue seems to come with the presence of side input because without I have no more loss. In Dataflow monitoring we can see about 20000 lost messages.
Job ID on DataflowRunner: 2018-01-17_05_33_45-3290466857677892673
If I restart the same job the number of lost messages is not the same
I created simple snippets to illustrate my issue
Publisher
String PROJECT_ID = "...";
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
p
.apply(GenerateSequence.from(0).to(15000000))
.apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
.apply(PubsubIO.writeStrings().to("projects/" + PROJECT_ID + "/topics/test_in"));
p.run();
Listenner
String PROJECT_ID = "...";
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
PCollectionView<Long> sideInput = p
.apply(GenerateSequence.from(0).to(10))
.apply(Count.globally())
.apply(View.asSingleton());
p
// 15,000,000 in input
.apply(PubsubIO.readMessages().fromSubscription("projects/" + PROJECT_ID + "/subscriptions/test_in"))
.apply(ParDo.of(new DoFn<PubsubMessage, PubsubMessage>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element());
}
}).withSideInputs(sideInput))
// 14,978,010 in output
.apply(PubsubIO.writeMessages().to("projects/" + PROJECT_ID + "/topics/test_out"));
p.run();
Upvotes: 3
Views: 1909
Reputation: 17913
The issue is most likely due to late data dropping. You can address it by setting a windowing strategy with an infinite allowed lateness.
Upvotes: 3