VS_FF
VS_FF

Reputation: 2363

DataFlow Batch Job Slow / Not Parallelizing Well

I have a batch job in Apache Beam 2.2.0 running on DataFlow in Google Cloud, which interacts with Bigtable. The job seems to execute correctly on very small test datasets, but doesn't seem to parallelize well and really under-utilizes the resources thrown at it.

The aim is roughly to achieve the following:

I run this job with 10 n1-standard-1 workers, explicitly disallowing auto-scaling, because in tries to scale things down. CPU utilization on each worker is below 10% and the BigTable instance seems similarly underutilized (virtually zero activity). My custom counters display a tiny bit of progress, so the job isn't stuck, but working really slowly.

Here are the snippets of the relevant code:

// Use side inputs to provide the relevant Table ID at each step            
final PCollectionView<String> tableA =
                    p.apply(Create.of("TableA")).apply(View.<String>asSingleton());
            final PCollectionView<String> tableB =
                    p.apply(Create.of("TableB")).apply(View.<String>asSingleton());
            final PCollectionView<String> tableC =
                    p.apply(Create.of("TableC")).apply(View.<String>asSingleton());

            p.apply(Create.of(inputID.getBytes()))  // <- Initial keyword "Bob"
            .apply(ParDo.of(new PartialMatch(configBT, tableA))
                    .withSideInputs(tableA))        
            .apply(ParDo.of(new PartialMatch(configBT, tableB))
                    .withSideInputs(tableB))
            .apply(ParDo.of(new PartialMatch(configBT, tableC))
                    .withSideInputs(tableC))
            .apply(ParDo.of(new GetInfo(configBT)))
            .apply(Sum.<String>integersPerKey())
            .apply(ParDo.of(new LogInfo(configBT)));
            p.run().waitUntilFinish();


    class PartialMatch extends AbstractCloudBigtableTableDoFn<byte[], byte[]>
    {   
        private static final long serialVersionUID = 1L;
        final PCollectionView<String> m_tableName;
        private Counter m_ct = Metrics.counter(PartialMatch.class, "matched");

        public PartialMatch(CloudBigtableConfiguration config,PCollectionView<String> side1)
        {
            super(config);
            m_tableName = side1;
        }   

        @ProcessElement
        public void processElement(DoFn<byte[], byte[]>.ProcessContext c)
        {
            try
            {
                byte rowKey[] = c.element();
                Scan s = new Scan(rowKey);
                FilterList fList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
                fList.addFilter(new PrefixFilter(rowKey));
                fList.addFilter(new KeyOnlyFilter());
                s.setFilter(fList); 

                ResultScanner scanner = getConnection().getTable(TableName.valueOf(c.sideInput(m_tableName))).getScanner(s);

                for (Result row : scanner)
                {
                    String rowKeyEls[] = new String(row.getRow()).split(DEF_SPLIT);
                    c.output(rowKeyEls[1].getBytes());
                }
                scanner.close();
                m_ct.inc();
            } catch (IOException e){e.printStackTrace();}
        }
    }

    class GetInfo extends AbstractCloudBigtableTableDoFn<byte[], KV<String, Integer>>
    {   
        private static final long serialVersionUID = 1L;
        private Counter m_ct = Metrics.counter(GetInfo.class, "extracted");
        private Counter m_ctFail = Metrics.counter(GetInfo.class, "failed");

        public GetInfo(CloudBigtableConfiguration config)
        {
            super(config);
        }   

        @ProcessElement
        public void processElement(DoFn<byte[], KV<String, Integer>>.ProcessContext c)
        {
            try
            {
                byte rowKey[] = c.element();

                Result trnRow = getConnection().getTable(TableName.valueOf(DEF_TBL_ID)).get(new Get(rowKey));
                if(trnRow.isEmpty())
                    m_ctFail.inc();
                else
                {                               
                    String b = new String(trnRow.getColumnLatestCell(DEF_CF, DEF_CN_B).getValueArray());
                    String s = new String(trnRow.getColumnLatestCell(DEF_CF,DEF_CN_S).getValueArray());
                    c.output(KV.of(b + DEF_FUSE + s, 1));
                    m_ct.inc();
                }
            } catch (IOException e){e.printStackTrace();}
        }
    }

    class LogInfo extends AbstractCloudBigtableTableDoFn<KV<String, Integer>, Integer>
    {   
        private static final long serialVersionUID = 1L;
        private Counter m_ct = Metrics.counter(LogInfo.class, "logged");

        public LogInfo(CloudBigtableConfiguration config)
        {
            super(config);
        }   

        @ProcessElement
        public void processElement(DoFn<KV<String, Integer>, Integer>.ProcessContext c)
        {
            try
            {
                Table tGraph = getConnection().getTable(TableName.valueOf(DEF.ID_TBL_GRAPH));

                String name = c.element().getKey();
                Integer ct = c.element().getValue();
                tGraph.put(new Put(name.getBytes()).addColumn(DEF.ID_CF_INF, DEF.ID_CN_CNT, Bytes.toBytes(ct)));
                m_ct.inc();
            }catch (IOException e){e.printStackTrace();}
            c.output(0);
        }
    }

What could be slowing things down?

Upvotes: 0

Views: 1777

Answers (1)

jkff
jkff

Reputation: 17913

Several things.

  • You're creating a separate connection to Bigtable for every single element processed by every function. Instead, put creation of the connection into @Setup and closing it into @Teardown
  • The code currently isn't closing the connections at all, so connections are leaking and that could be slowing it down too.
  • Your pipeline is a straight line of ParDo's so they are most likely all fused together and it's suffering from excessive fusion. See the recent answer How can I maximize throughput for an embarrassingly-parallel task in Python on Google Cloud Platform? . In Java, you can insert Reshuffle.viaRandomKey() between your ParDo's.
  • Your pipeline is using hand-crafted code to write mutations to BigTable one-by-one. This is inefficient, mutations ought to be batched to maximize throughput. BigtableIO.write() does this for you, so I recommend you use it instead of hand-crafted code.

Upvotes: 2

Related Questions