Reputation: 31
I'm trying to do the following using the DataFlowRunner:
I'm pretty much new to Apache Beam so I'm not 100% sure that what I want to do is possible.
My problem comes when I try to join both rows, after using the CoGroupByKey transform, it seems like the data never arrives at the same time although the windowing strategy is the same (30sec fixed window, end of window trigger and discarding fired panes).
Some relevant chunks of my code:
/* Getting the data and windowing */
PCollection<PubsubMessage> pubsub = p.apply("ReadPubSub sub",PubsubIO.readMessages().fromSubscription(SUB_ALIM_REC));
String query = /* The query */
PCollection<TableRow> bqData = p.apply("Reading BQ",BigQueryIO.readTableRows().fromQuery(query).usingStandardSql())
.apply(Window.<TableRow>into(FixedWindows.of(Duration.standardSeconds(30)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.standardSeconds(0)).accumulatingFiredPanes());
PCollection<TableRow> tableRow = pubsub.apply(Window.<PubsubMessage>into(FixedWindows.of(Duration.standardSeconds(120)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.standardSeconds(0)).accumulatingFiredPanes())
.apply("JSON to TableRow",ParDo.of(new ToTableRow()));
/* Join code */
PCollection<TableRow> finalResultCollection =
kvpCollection.apply("Join TableRows", ParDo.of(
new DoFn<KV<Long, CoGbkResult>, TableRow>() {
private static final long serialVersionUID = 6627878974147676533L;
@ProcessElement
public void processElement(ProcessContext c) {
KV<Long, CoGbkResult> e = c.element();
Long idPaquete = e.getKey();
Iterable<TableRow> it1 = e.getValue().getAll(packTag);
Iterable<TableRow> it2 = e.getValue().getAll(alimTag);
for(TableRow t1 : itPaq) {
for (TableRow t2 : itAlimRec) {
TableRow joinedRow = new TableRow();
/* set the required fields from each collection */
c.output(joinedRow);
}
}
}
}));
Also in the past two days I've been getting this error:
java.io.IOException: Failed to start reading from source: org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@2808d228
com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:783)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException: BigQuery source must be split before being read
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase.createReader(BigQuerySourceBase.java:153)
org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.advance(UnboundedReadFromBoundedSource.java:463)
org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.access$300(UnboundedReadFromBoundedSource.java:442)
org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.advance(UnboundedReadFromBoundedSource.java:293)
org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.start(UnboundedReadFromBoundedSource.java:286)
com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:778)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
I'd really appreciate your guidance to know if what I'm trying to do is possible or if there's an alternative to solve this scenario.
Upvotes: 3
Views: 1499
Reputation: 121
I have tried to do the same thing. As I understand from this question, it's currently impossible. I have tried to do it on my own, using PeriodicImpulse, following this example (though, I didn't want a side input). I wrote something like the following code and I got ValueError: BigQuery source is not currently available for use in streaming pipelines.
.
segments = p | 'triggering segments fetch' >> PeriodicImpulse() \
| "loading segments" >> beam.io.Read(beam.io.BigQuerySource(
use_standard_sql=True,
query=f'''
SELECT
id,
segment
FROM `some_table`''')) \
| "windowing info" >> beam.WindowInto(window.FixedWindows(5))
info = p | "reading info" >> beam.io.ReadFromPubSub(
topic='my_test_topic') \
| "parsing info" >> beam.Map(message_to_json) \
| "mapping info" >> beam.Map(lambda x: (x['id'], x['username'])) \
| "windowing info" >> beam.WindowInto(window.FixedWindows(5))
results = ({'segments': segments, 'info': info} | beam.CoGroupByKey()) | "printing" >> beam.Map(print_out)
I think that the best solution right now is to use external storage like Datastore. I have used that approach in another production pipeline and it works great. You can find the explanation here.
Upvotes: 0