Reputation: 143
I have a pipeline that receive some data from pub sub, do some processing, and needs to process all data on Bigtable based on the result of that processing.
For example, I have a pub sub message like: {clientId: 10}
, so I need to read from Bigtable all the data for clientId 10 (I know how to create the scan based on the clientId). The problem is that both reads that we have at the moment for Bigtable (BigtableIO and CloudBigtableIO) are based on the fact that pipeline starts with bigtable, so I can not (or could not find a way) to use them on the middle of the pipeline. How can I achieve this case?
Simple pseudo-like code:
Pipeline p = Pipeline.create(...)
p.apply(PubsubIO.readMessagesWithAttributes ...)
.apply( PubsubMessageToScans()) // I know how to do this
.apply( ReadBigTable()) // How to do this?
Upvotes: 2
Views: 1794
Reputation: 1703
UPDATED:
I recently was playing around with Bigtable and Dataflow and encountered the same issue you described here. I don't believe there is a way to do a Read.from(CloudBigtableIO.read(config)
in the middle of a pipeline, so you'd have to create your own DoFn. You can extend the AbstractCloudBigtableTableDoFn and access the easily reusable and configurable Bigtable connection through getConnection()
. Here is an example Dataflow/Beam job I put together that shows how to do this:
public class ReadInMiddleOfPipeline {
public static void main(String[] args) {
BigtableOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);
CloudBigtableTableConfiguration bigtableTableConfig =
new CloudBigtableTableConfiguration.Builder()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId())
.build();
p.apply(GenerateSequence.from(0).to(10).withRate(1, new Duration(1000)))
.apply(ParDo.of(new ReadFromTableFn(bigtableTableConfig)));
p.run().waitUntilFinish();
}
static class ReadFromTableFn extends AbstractCloudBigtableTableDoFn<Long, Void> {
public ReadFromTableFn(CloudBigtableConfiguration config) {
super(config);
}
@ProcessElement
public void processElement(@Element Long input, OutputReceiver<Void> out, PipelineOptions po) {
BigtableOptions options = po.as(BigtableOptions.class);
try {
Table table = getConnection().getTable(TableName.valueOf(options.getBigtableTableId()));
Scan scan = new Scan().setRowPrefixFilter(Bytes.toBytes("#phone"));
ResultScanner rows = table.getScanner(scan);
for (Result row : rows) {
System.out.printf(
"Reading data for %s%n", Bytes.toString(row.rawCells()[0].getRowArray()));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public interface BigtableOptions extends DataflowPipelineOptions {
@Description("The Bigtable project ID, this can be different than your Dataflow project")
@Default.String("bigtable-project")
String getBigtableProjectId();
void setBigtableProjectId(String bigtableProjectId);
@Description("The Bigtable instance ID")
@Default.String("bigtable-instance")
String getBigtableInstanceId();
void setBigtableInstanceId(String bigtableInstanceId);
@Description("The Bigtable table ID in the instance.")
@Default.String("bigtable-table")
String getBigtableTableId();
void setBigtableTableId(String bigtableTableId);
}
}
Upvotes: 2
Reputation: 355
To complement @Billy's answer, you can also try using the BigtableDataClient class inside a ParDo transformation. The data input will be the parameters contained in a PubsubMessage to configure the Scan object, then in the ParDo set the Scan parameters, make the connection to BigTable and obtain the filtered results.
This snippet maybe be useful:
@ProcessElement
public void processElement(@Element String element, OutputReceiver<String> out){
String projectId = "<PROJECT_ID>";
String instanceId = "<INSTANCE_ID>";
String tableName = "<TABLENAME>";
String[] scanParameters = element.split(",");
try (Connection connection = BigtableConfiguration.connect(projectId, instanceId)){
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(scanParameters[0]));
scan.withStopRow(Bytes.toBytes(scanParameters[1]));
ResultScanner scanner = table.getScanner(scan);
for (Result row : scanner) {
System.out.println(row);
}
catch (Exception e){
e.printStackTrace();
}
out.output("");
}
I didn't test it directly with a PubsubMessage but, you can do another transform to adapt the message or directly get the PubsubMessage and set the Scan object.
Upvotes: 3