Reputation: 48506
I want to read data from BigQuery every hour in Beam and write the following codes refer to this,
pipeline.apply("Generate Sequence",
GenerateSequence.from(0).withRate(1, Duration.standardHours(1)))
.apply(Window.into(FixedWindows.of(Duration.standardHours(1))))
.apply("Read Row from BQ",
BigQueryIO.readTableRows()
.fromQuery("select col1, col2, col3 from project.market.table ")
.usingStandardSql())
.apply("Convert Row",
MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow))
But this error no instance(s) of type variable(s) exist so that PCollection<Long> conforms to PBegin
comes up. Is there any way to fix it?
I have tried through DoFn
as below. However, I failed to find one way to convert TypedRead<TableRow>
to MyData
.
pipeline.apply("Generate Sequence",
GenerateSequence.from(0).withRate(1, Duration.standardHours(1)))
.apply(Window.into(FixedWindows.of(Duration.standardHours(1))))
.apply("Read Row from BQ query", ParDo.of(new ReadBQParDo()))
.apply("Map TableRow", ParDo.of(new MapTableRow()))
static class ReadBQParDo extends DoFn<Long, BigQueryIO.TypedRead<TableRow>> {
@ProcessElement
public void processElement(ProcessContext pc) {
BigQueryIO.TypedRead<TableRow> rows = BigQueryIO.readTableRows()
.fromQuery("select col1, col2, col3 from project.market.table")
.usingStandardSql();
pc.output(rows);
}
}
static class MapTableRow extends DoFn<BigQueryIO.TypedRead<TableRow>, MyData> {
@ProcessElement
public void processElement(ProcessContext pc) {
// stuck here, how to convert TypedRead<TableRow> to MyData??
}
}
Beam version: 2.23.0
Upvotes: 0
Views: 1114
Reputation: 5104
In your first snippet, BigQueryIO.readTableRows()
must be at the root of Pipeline, i.e. it is a PTransform<PBegin, ...>
. You're trying to apply it to the output of GenerateSequence
which is a PCollection<Long>
.
In the second, you can't use PTransforms from within a DoFn (unless you're literally trying to launch a new Beam pipeline for each element).
Upvotes: 2