Reputation: 19982
I am trying to see if it is possible to send the elements of a PCollection to the parent process when using DirectRunner
from Python's Apache Beam SDK.
However, I ran into a strange error where everything seems to work fine when the queue is instantiated and the pipeline is called inside the __main__
section of the script, but not when the same code is called inside a subfunction. I am guessing that this is due to some pickling / dilling that is going on under the scenes, but a more concrete explanation would be appreciated.
The /tmp/inputs/winterstale.txt
file used below can be downloaded from: https://storage.googleapis.com/apache-beam-samples/shakespeare/winterstale.txt
from __future__ import print_function
import atexit
import queue
import tempfile
import time
import unittest
import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
from apache_beam.runners.direct.direct_runner import BundleBasedDirectRunner
from apache_beam.runners.interactive.cache_manager import FileBasedCacheManager
from apache_beam.runners.interactive.cache_manager import ReadCache
from apache_beam.runners.interactive.cache_manager import WriteCache
def add_to_queue(element, queue):
queue.put(element)
def write_to_queue():
q = queue.Queue()
with beam.Pipeline(runner=BundleBasedDirectRunner()) as p:
_ = (
p
| "Read" >> beam.io.ReadFromText("/tmp/inputs/winterstale.txt")
| "Remove whitespace" >> beam.Map(lambda element: element.strip("\n\t|"))
| "Remove empty lines" >> beam.FlatMap(lambda element: [element] if element else [])
| "Write" >> beam.Map(lambda element: add_to_queue(element, queue=q))
)
return list(q.queue)
if __name__ == "__main__":
cache_location = tempfile.mkdtemp()
atexit.register(FileSystems.delete, [cache_location])
# Using a function call
cache_manager = FileBasedCacheManager(cache_dir=cache_location)
result1 = write_to_queue()
print(len(result1)) # >>> prints "0" <<<
# Copy-pasing the code from "write_to_queue()"
q = queue.Queue()
with beam.Pipeline(runner=BundleBasedDirectRunner()) as p:
_ = (
p
| "Read" >> beam.io.ReadFromText("/tmp/inputs/winterstale.txt")
| "Remove whitespace" >> beam.Map(lambda element: element.strip("\n\t|"))
| "Remove empty lines" >> beam.FlatMap(lambda element: [element] if element else [])
| "Write" >> beam.Map(lambda element: add_to_queue(element, queue=q))
)
result2 = list(q.queue) # >>> prints "3561" <<<
print(len(result2))
Upvotes: 3
Views: 110
Reputation: 5104
In general, everything is pickled before sending it to the runner. In this case, the queue object itself would typically be pickled, and your elements appended to an unpickled copy during execution (hence the 0 return value). I think what is going on here is that BundleBasedDirectRunner is inconstant about what it pickles (e.g. depending on if there were pickling errors earlier, due to including closure from the main session, it might abandon all pickling attempts and continue with the original objects).
It might be worth trying with some other runner in which case the behavior should be consistent (probably always zero) and if there's a pickling error it would be informatively raised rather than suppressed.
Upvotes: 1