Sunil
Sunil

Reputation: 431

Batch vs Streaming Performance in Google Cloud Dataflow

we have bounded data , around 3.5 million records in BigQuery. These data needs to be processed using Dataflow (mostly it is some external API calls + transformations)

From the document - https://cloud.google.com/dataflow/docs/resources/faq#beam-java-sdk

I see Batch mode uses single thread and stream uses 300 threads per worker.For us, most of my operation is Network bound because of external API calls.

  1. Considering this, which one would be more performant and cost efficient ? Batch - by spinning x workers or Stream with x workers and 300 threads.

  2. If it is streaming then I should send the data which is present in BigQuery to pub/sub ? Is my understanding correct ?

Upvotes: 0

Views: 1509

Answers (2)

robertwb
robertwb

Reputation: 5104

Here is an example of a DoFn that processes multiple items concurrently:

  class MultiThreadedDoFn(beam.DoFn):
    def __init__(self, func, num_threads=10):
      self.func = func
      self.num_threads = num_threads

    def setup(self):
      self.done = False
      self.input_queue = queue.Queue(2)
      self.output_queue = queue.Queue()
      self.threads = [
          threading.Thread(target=self.work, daemon=True)
          for _ in range(self.num_threads)]
      for t in self.threads:
        t.start()

    def work(self):
      while not self.done:
        try:
          windowed_value = self.input_queue.get(timeout=0.1)
          self.output_queue.put(
              windowed_value.with_value(func(windowed_value.value)))
        except queue.Empty:
          pass  # check self.done

    def start_bundle(self):
      self.pending = 0

    def process(self, element,
                timestamp=beam.DoFn.TimestampParam,
                window=beam.DoFn.WindowParam):
      self.pending += 1
      self.input_queue.put(
          beam.transforms.window.WindowedValue(
              element, timestamp, (window,)))
      try:
        while not self.output_queue.empty():
          yield self.output_queue.get(block=False)
          self.pending -= 1
      except queue.Empty:
        pass

    def finish_bundle(self):
      while self.pending > 0:
        yield self.output_queue.get()
        self.pending -= 1

    def teardown(self):
      self.done = True
      for t in self.threads:
        t.join()

It can be used as

  def func(n):
    time.sleep(n / 10)
    return n + 1

  with beam.Pipeline() as p:
    p | beam.Create([1, 3, 5, 7] * 10 + [9]) | beam.ParDo(MultiThreadedDoFn(func)) | beam.Map(logging.error)

Upvotes: 0

Bruno Volpato
Bruno Volpato

Reputation: 1428

The Batch vs Streaming decision usually comes from the source that you are reading from (Bounded vs Unbounded). When reading from BigQueryIO, it comes is bounded.

There are ways to convert from a BoundedSource to an UnboundedSource) (see Using custom DataFlow unbounded source on DirectPipelineRunner) but I don't see it recommended anywhere, and I am not sure you would get any benefit from it. Streaming has to keep track of checkpoints and watermarks, which could result in an overhead for your workers.

Upvotes: 2

Related Questions