Reputation: 2363
I have a batch job in Apache Beam 2.2.0 running on DataFlow in Google Cloud, which interacts with Bigtable. The job seems to execute correctly on very small test datasets, but doesn't seem to parallelize well and really under-utilizes the resources thrown at it.
The aim is roughly to achieve the following:
I run this job with 10 n1-standard-1 workers, explicitly disallowing auto-scaling, because in tries to scale things down. CPU utilization on each worker is below 10% and the BigTable instance seems similarly underutilized (virtually zero activity). My custom counters display a tiny bit of progress, so the job isn't stuck, but working really slowly.
Here are the snippets of the relevant code:
// Use side inputs to provide the relevant Table ID at each step
final PCollectionView<String> tableA =
p.apply(Create.of("TableA")).apply(View.<String>asSingleton());
final PCollectionView<String> tableB =
p.apply(Create.of("TableB")).apply(View.<String>asSingleton());
final PCollectionView<String> tableC =
p.apply(Create.of("TableC")).apply(View.<String>asSingleton());
p.apply(Create.of(inputID.getBytes())) // <- Initial keyword "Bob"
.apply(ParDo.of(new PartialMatch(configBT, tableA))
.withSideInputs(tableA))
.apply(ParDo.of(new PartialMatch(configBT, tableB))
.withSideInputs(tableB))
.apply(ParDo.of(new PartialMatch(configBT, tableC))
.withSideInputs(tableC))
.apply(ParDo.of(new GetInfo(configBT)))
.apply(Sum.<String>integersPerKey())
.apply(ParDo.of(new LogInfo(configBT)));
p.run().waitUntilFinish();
class PartialMatch extends AbstractCloudBigtableTableDoFn<byte[], byte[]>
{
private static final long serialVersionUID = 1L;
final PCollectionView<String> m_tableName;
private Counter m_ct = Metrics.counter(PartialMatch.class, "matched");
public PartialMatch(CloudBigtableConfiguration config,PCollectionView<String> side1)
{
super(config);
m_tableName = side1;
}
@ProcessElement
public void processElement(DoFn<byte[], byte[]>.ProcessContext c)
{
try
{
byte rowKey[] = c.element();
Scan s = new Scan(rowKey);
FilterList fList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
fList.addFilter(new PrefixFilter(rowKey));
fList.addFilter(new KeyOnlyFilter());
s.setFilter(fList);
ResultScanner scanner = getConnection().getTable(TableName.valueOf(c.sideInput(m_tableName))).getScanner(s);
for (Result row : scanner)
{
String rowKeyEls[] = new String(row.getRow()).split(DEF_SPLIT);
c.output(rowKeyEls[1].getBytes());
}
scanner.close();
m_ct.inc();
} catch (IOException e){e.printStackTrace();}
}
}
class GetInfo extends AbstractCloudBigtableTableDoFn<byte[], KV<String, Integer>>
{
private static final long serialVersionUID = 1L;
private Counter m_ct = Metrics.counter(GetInfo.class, "extracted");
private Counter m_ctFail = Metrics.counter(GetInfo.class, "failed");
public GetInfo(CloudBigtableConfiguration config)
{
super(config);
}
@ProcessElement
public void processElement(DoFn<byte[], KV<String, Integer>>.ProcessContext c)
{
try
{
byte rowKey[] = c.element();
Result trnRow = getConnection().getTable(TableName.valueOf(DEF_TBL_ID)).get(new Get(rowKey));
if(trnRow.isEmpty())
m_ctFail.inc();
else
{
String b = new String(trnRow.getColumnLatestCell(DEF_CF, DEF_CN_B).getValueArray());
String s = new String(trnRow.getColumnLatestCell(DEF_CF,DEF_CN_S).getValueArray());
c.output(KV.of(b + DEF_FUSE + s, 1));
m_ct.inc();
}
} catch (IOException e){e.printStackTrace();}
}
}
class LogInfo extends AbstractCloudBigtableTableDoFn<KV<String, Integer>, Integer>
{
private static final long serialVersionUID = 1L;
private Counter m_ct = Metrics.counter(LogInfo.class, "logged");
public LogInfo(CloudBigtableConfiguration config)
{
super(config);
}
@ProcessElement
public void processElement(DoFn<KV<String, Integer>, Integer>.ProcessContext c)
{
try
{
Table tGraph = getConnection().getTable(TableName.valueOf(DEF.ID_TBL_GRAPH));
String name = c.element().getKey();
Integer ct = c.element().getValue();
tGraph.put(new Put(name.getBytes()).addColumn(DEF.ID_CF_INF, DEF.ID_CN_CNT, Bytes.toBytes(ct)));
m_ct.inc();
}catch (IOException e){e.printStackTrace();}
c.output(0);
}
}
What could be slowing things down?
Upvotes: 0
Views: 1777
Reputation: 17913
Several things.
@Setup
and closing it into @Teardown
Reshuffle.viaRandomKey()
between your ParDo's.BigtableIO.write()
does this for you, so I recommend you use it instead of hand-crafted code.Upvotes: 2