Thiago Chiarato
Thiago Chiarato

Reputation: 39

Apache Beam Not Saving Unbounded Data To Text File

I've created a Pipeline to save Google Cloud Pubsub messages into text files using Apache Beam and Java. Whenever I run the pipeline within Google Dataflow with --runner=DataflowRunner the messages are saved correctly.

However, when I run the same pipeline with --runner=DirerctRunner the messages are not saved.

I can watch the events coming through the pipeline, but nothing happens.

The pipeline is the code below:

public static void main(String[] args) {
    ExerciseOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ExerciseOptions.class);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
      .apply("Read Messages from Pubsub",
        PubsubIO
          .readStrings()
          .fromTopic(options.getTopicName()))

      .apply("Set event timestamp", ParDo.of(new DoFn<String, String>() {
        @ProcessElement
        public void processElement(ProcessContext context) {
          context.outputWithTimestamp(context.element(), Instant.now());
        }
      }))

      .apply("Windowing", Window.into(FixedWindows.of(Duration.standardMinutes(5))))

      .apply("Write to File",
        TextIO
          .write()
          .withWindowedWrites()
          .withNumShards(1)
          .to(options.getOutputPrefix()));

    pipeline.run();
  }

What I'm doing wrong? Is it possible to run this pipeline locally?

Upvotes: 3

Views: 1094

Answers (1)

ineersa
ineersa

Reputation: 3445

I was facing the same problem. PubSubIO was not working correctly with DirectRunner and TextIO. I found a workaround for this issue with window triggering before the write. For some runners, this needs the --streaming tag to work.

  pipeline
    .apply("2 minutes window", 
        Window.<String>into(FixedWindows.of(Duration.standardMinutes(2)))
          .triggering(Repeatedly.forever(AfterFirst.of(
                      AfterPane.elementCountAtLeast(10),
                      AfterProcessingTime
                          .pastFirstElementInPane()
                          .plusDelayOf(Duration.standardMinutes(2)))))
          .withAllowedLateness(Duration.standardSeconds(10))
          .discardingFiredPanes())

This way the files are written as expected.

Upvotes: 2

Related Questions