Rakesh Sabbani
Rakesh Sabbani

Reputation: 1735

GCP - Bigquery to Kafka as streaming

I have a dataflow application(java) which is running in gcp and able to read the data from bigquery table and write to Kafka. But the application running as a batch mode, where as I would like make application as stream to read the data continuously from bigquery table and write to kafka topic.

Bigquery Table: Partitioned table with insert_time ( timestamp of record inserted intable) and message column

 PCollection<TableRow> tablesRows = BigQueryUtil.readFromTable(pipeline,
"select message,processed from `myprojectid.mydatasetname.mytablename` " +
                            "where processed = false " +
                            "order by insert_time desc ")
                            .apply("Windowing",Window.into(FixedWindows.of(Duration.standardMinutes(1))));

.apply("Converting to writable message", ParDo.of(new ProcessRowDoFn()))
    .apply("Writing Messages", KafkaIO.<String, String>write().
                            withBootstrapServers(bootStrapURLs).
                            withTopic(options.getKafkaInputTopics()).
                            withKeySerializer(StringSerializer.class).
                            withValueSerializer(StringSerializer.class).
                                    withProducerFactoryFn(new ProducerFactoryFn(sslConfig, projected))
                    );
 pipeline.run();

Note: I have tried below options but no luck yet

Options 1. I tried the options of options.streaming (true); its running as stream but it will finish on the first success write.

Options 2. Applied trigger

Window.into(
                            FixedWindows.of(Duration.standardMinutes(5)))
                    .triggering(
                            AfterWatermark.pastEndOfWindow()
                                    .withLateFirings(AfterPane.elementCountAtLeast(1)))
                    .withAllowedLateness(Duration.standardDays(2))
                    .accumulatingFiredPanes();

Option 3. Making unbounded forcibly

     WindowingStrategy<?, ?> windowingStrategy = tablesRows.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED).getWindowingStrategy();
.apply("Converting to writable message", ParDo.of(new ProcessRowDoFn())).setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED)

Any solution is appreciated.

Upvotes: 0

Views: 505

Answers (1)

Jeff Klukas
Jeff Klukas

Reputation: 1357

Some of the advice in Side Input Patterns in the Beam Programming Guide may be helpful here, even though you aren't using this as a side input. In particular, that article discusses using GenerateSequence to periodically emit a value and trigger a read from a bounded source.

This could allow your one time query to become a repeated query that periodically emits new records. It will be up to your query logic to determine what range of the table to scan on each query, though, and I expect it will be difficult to avoid emitting duplicate records. Hopefully your use case can tolerate that.

Emitting into the global window would look like:

    PCollectionView<Map<String, String>> map =
        p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))
            .apply(Window.into(FixedWindows.of(Duration.standardSeconds(5))))
            .apply(Sum.longsGlobally().withoutDefaults())
            .apply(
                ParDo.of(
                    new DoFn<Long, Map<String, String>>() {

                      @ProcessElement
                      public void process(
                          @Element Long input,
                          @Timestamp Instant timestamp,
                          OutputReceiver<Map<String, String>> o) {
                        // Read from BigQuery here and for each row output a record:                      o.output(PlaceholderExternalService.readTestData(timestamp)
);
                      }
                    }))
            .apply(
                Window.<Map<String, String>>into(new GlobalWindows())
                    .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                    .discardingFiredPanes())
            .apply(View.asSingleton());

This assumes that the size of the query result is relatively small, since the read happens entirely within a DoFn invocation.

Upvotes: 2

Related Questions