Reputation: 2661
I can stream inserts directly into BigQuery at a speed of about 10,000 inserts per second but when I try to insert using Dataflow the 'ToBqRow' step (given below) is EXTREMELY slow. Barely 50 rows per 10 minutes and this is with 4 Workers. Any idea why? Here's the relevant code:
PCollection<Status> statuses = p
.apply("GetTweets", PubsubIO.readStrings().fromTopic(topic))
.apply("ExtractData", ParDo.of(new DoFn<String, Status>() {
@ProcessElement
public void processElement(DoFn<String, Status>.ProcessContext c) throws Exception {
String rowJson = c.element();
try {
TweetsWriter.LOGGER.debug("ROWJSON = " + rowJson);
Status status = TwitterObjectFactory.createStatus(rowJson);
if (status == null) {
TweetsWriter.LOGGER.error("Status is null");
} else {
TweetsWriter.LOGGER.debug("Status value: " + status.getText());
}
c.output(status);
TweetsWriter.LOGGER.debug("Status: " + status.getId());
} catch (Exception var4) {
TweetsWriter.LOGGER.error("Status creation from JSON failed: " + var4.getMessage());
}
}
}));
statuses
.apply("ToBQRow", ParDo.of(new DoFn<Status, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TableRow row = new TableRow();
Status status = c.element();
row.set("Id", status.getId());
row.set("Text", status.getText());
row.set("RetweetCount", status.getRetweetCount());
row.set("FavoriteCount", status.getFavoriteCount());
row.set("Language", status.getLang());
row.set("ReceivedAt", (Object)null);
row.set("UserId", status.getUser().getId());
row.set("CountryCode", status.getPlace().getCountryCode());
row.set("Country", status.getPlace().getCountry());
c.output(row);
}
}))
.apply("WriteTableRows", BigQueryIO.writeTableRows().to(tweetsTable)
.withSchema(schema)
.withMethod(Method.STREAMING_INSERTS)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));
p.run();
Upvotes: 2
Views: 1364
Reputation: 2661
Turns out Bigquery under Dataflow is NOT slow. Problem was, 'status.getPlace().getCountryCode()' was returning NULL so it was throwing NullPointerException that I couldn't see anywhere in the log! Clearly, Dataflow logging needs to improve. It's running really well now. As soon as message comes in the topic, almost instantaneously it gets written to BigQuery!
Upvotes: 2