Reputation: 431
we have bounded data , around 3.5 million records in BigQuery. These data needs to be processed using Dataflow (mostly it is some external API calls + transformations)
From the document - https://cloud.google.com/dataflow/docs/resources/faq#beam-java-sdk
I see Batch mode uses single thread and stream uses 300 threads per worker.For us, most of my operation is Network bound because of external API calls.
Considering this, which one would be more performant and cost efficient ? Batch - by spinning x workers or Stream with x workers and 300 threads.
If it is streaming then I should send the data which is present in BigQuery to pub/sub ? Is my understanding correct ?
Upvotes: 0
Views: 1509
Reputation: 5104
Here is an example of a DoFn that processes multiple items concurrently:
class MultiThreadedDoFn(beam.DoFn):
def __init__(self, func, num_threads=10):
self.func = func
self.num_threads = num_threads
def setup(self):
self.done = False
self.input_queue = queue.Queue(2)
self.output_queue = queue.Queue()
self.threads = [
threading.Thread(target=self.work, daemon=True)
for _ in range(self.num_threads)]
for t in self.threads:
t.start()
def work(self):
while not self.done:
try:
windowed_value = self.input_queue.get(timeout=0.1)
self.output_queue.put(
windowed_value.with_value(func(windowed_value.value)))
except queue.Empty:
pass # check self.done
def start_bundle(self):
self.pending = 0
def process(self, element,
timestamp=beam.DoFn.TimestampParam,
window=beam.DoFn.WindowParam):
self.pending += 1
self.input_queue.put(
beam.transforms.window.WindowedValue(
element, timestamp, (window,)))
try:
while not self.output_queue.empty():
yield self.output_queue.get(block=False)
self.pending -= 1
except queue.Empty:
pass
def finish_bundle(self):
while self.pending > 0:
yield self.output_queue.get()
self.pending -= 1
def teardown(self):
self.done = True
for t in self.threads:
t.join()
It can be used as
def func(n):
time.sleep(n / 10)
return n + 1
with beam.Pipeline() as p:
p | beam.Create([1, 3, 5, 7] * 10 + [9]) | beam.ParDo(MultiThreadedDoFn(func)) | beam.Map(logging.error)
Upvotes: 0
Reputation: 1428
The Batch vs Streaming decision usually comes from the source that you are reading from (Bounded vs Unbounded). When reading from BigQueryIO, it comes is bounded.
There are ways to convert from a BoundedSource to an UnboundedSource) (see Using custom DataFlow unbounded source on DirectPipelineRunner) but I don't see it recommended anywhere, and I am not sure you would get any benefit from it. Streaming has to keep track of checkpoints and watermarks, which could result in an overhead for your workers.
Upvotes: 2