zangw
zangw

Reputation: 48506

BigQueryIO.readTableRows - no instance(s) of type variable(s) exist so that PCollection<Long> conforms to PBegin

I want to read data from BigQuery every hour in Beam and write the following codes refer to this,

pipeline.apply("Generate Sequence",
            GenerateSequence.from(0).withRate(1, Duration.standardHours(1)))
    .apply(Window.into(FixedWindows.of(Duration.standardHours(1))))
    .apply("Read Row from BQ",                         
             BigQueryIO.readTableRows()
             .fromQuery("select col1, col2, col3 from project.market.table ")
             .usingStandardSql())
    .apply("Convert Row",
            MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow))

But this error no instance(s) of type variable(s) exist so that PCollection<Long> conforms to PBegin comes up. Is there any way to fix it?

I have tried through DoFn as below. However, I failed to find one way to convert TypedRead<TableRow> to MyData.

pipeline.apply("Generate Sequence",
            GenerateSequence.from(0).withRate(1, Duration.standardHours(1)))
    .apply(Window.into(FixedWindows.of(Duration.standardHours(1))))
    .apply("Read Row from BQ query", ParDo.of(new ReadBQParDo()))
    .apply("Map TableRow", ParDo.of(new MapTableRow()))

static class ReadBQParDo extends DoFn<Long, BigQueryIO.TypedRead<TableRow>> {
    @ProcessElement
    public void processElement(ProcessContext pc) {
        BigQueryIO.TypedRead<TableRow> rows = BigQueryIO.readTableRows()
                .fromQuery("select col1, col2, col3 from project.market.table")
                .usingStandardSql();
        pc.output(rows);
    }
}

static class MapTableRow extends DoFn<BigQueryIO.TypedRead<TableRow>, MyData> {
    @ProcessElement
    public void processElement(ProcessContext pc) {
        
        // stuck here, how to convert TypedRead<TableRow> to MyData??
    }
}

Beam version: 2.23.0

Upvotes: 0

Views: 1114

Answers (1)

robertwb
robertwb

Reputation: 5104

In your first snippet, BigQueryIO.readTableRows() must be at the root of Pipeline, i.e. it is a PTransform<PBegin, ...>. You're trying to apply it to the output of GenerateSequence which is a PCollection<Long>.

In the second, you can't use PTransforms from within a DoFn (unless you're literally trying to launch a new Beam pipeline for each element).

Upvotes: 2

Related Questions