Reputation: 39
I've created a Pipeline to save Google Cloud Pubsub messages into text files using Apache Beam and Java.
Whenever I run the pipeline within Google Dataflow with --runner=DataflowRunner
the messages are saved correctly.
However, when I run the same pipeline with --runner=DirerctRunner
the messages are not saved.
I can watch the events coming through the pipeline, but nothing happens.
The pipeline is the code below:
public static void main(String[] args) {
ExerciseOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ExerciseOptions.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read Messages from Pubsub",
PubsubIO
.readStrings()
.fromTopic(options.getTopicName()))
.apply("Set event timestamp", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext context) {
context.outputWithTimestamp(context.element(), Instant.now());
}
}))
.apply("Windowing", Window.into(FixedWindows.of(Duration.standardMinutes(5))))
.apply("Write to File",
TextIO
.write()
.withWindowedWrites()
.withNumShards(1)
.to(options.getOutputPrefix()));
pipeline.run();
}
What I'm doing wrong? Is it possible to run this pipeline locally?
Upvotes: 3
Views: 1094
Reputation: 3445
I was facing the same problem.
PubSubIO
was not working correctly with DirectRunner
and TextIO
. I found a workaround for this issue with window triggering before the write. For some runners, this needs the --streaming
tag to work.
pipeline
.apply("2 minutes window",
Window.<String>into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(10),
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(2)))))
.withAllowedLateness(Duration.standardSeconds(10))
.discardingFiredPanes())
This way the files are written as expected.
Upvotes: 2