Reputation: 9
We're trying to connect to an Oracle database using GCP Dataflow and Python job templates. As we use a special subnetwork that doesn't have internet access to run Dataflow jobs, we install dependency packages from a GCS bucket using setup.py.
Below is the command line to create Dataflow template with setup.py:
python3 -m <python_file_name> --runner DataflowRunner --project <project_id> --staging_location <gcs_staging> --temp_location <gcs_temp> --template_location <gcs_template> --region <region> --setup_file=./setup.py
Dependency packages are stored in a GCP bucket and will be copied to Dataflow workers and installed on Dataflow workers when a job runs. For Oracle database connection, we use oracledb-1.0.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl, which was downloaded from https://pypi.org/project/oracledb/#files.
When we try with Cloud Shell and DirectRunner, it can successfully install and recognize oracledb module. However, when a Dataflow job executes, it hits the below error:
Error message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.9/site-packages/dataflow_worker/batchworker.py", line 772, in run self._load_main_session(self.local_staging_directory) File "/usr/local/lib/python3.9/site-packages/dataflow_worker/batchworker.py", line 509, in _load_main_session pickler.load_session(session_file) File "/usr/local/lib/python3.9/site-packages/apache_beam/internal/pickler.py", line 65, in load_session return desired_pickle_lib.load_session(file_path) File "/usr/local/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py", line 313, in load_session return dill.load_session(file_path) File "/usr/local/lib/python3.9/site-packages/dill/_dill.py", line 368, in load_session module = unpickler.load() File "/usr/local/lib/python3.9/site-packages/dill/_dill.py", line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.9/site-packages/dill/_dill.py", line 826, in _import_module return import(import_name) ModuleNotFoundError: No module named 'oracledb'
Many thanks in advance for your advice.
setup.py
import os
import logging
import subprocess
import pickle
import setuptools
import distutils
from setuptools.command.install import install as _install
class install(_install): # pylint: disable=invalid-name
def run(self):
self.run_command('CustomCommands')
_install.run(self)
WHEEL_PACKAGES = [
'wheel-0.37.1-py2.py3-none-any.whl',
'oracledb-1.0.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl'
]
CUSTOM_COMMANDS = [
['sudo', 'apt-get', 'update']
]
class CustomCommands(setuptools.Command):
"""A setuptools Command class able to run arbitrary commands."""
def initialize_options(self):
pass
def finalize_options(self):
pass
def run_command(self, command):
import subprocess
import logging
logging.getLogger().setLevel(logging.INFO)
status = -9999
try:
logging.info('CUSTOM_DATAFLOW_JOB_LOG: started running [{}]'.format(command))
status = subprocess.call(command)
if status == 0:
logging.info('CUSTOM_DATAFLOW_JOB_LOG: [{}] completed successfully'.format(command))
else:
logging.error('CUSTOM_DATAFLOW_JOB_LOG: [{}] failed with signal {}'.format(command, status))
except Exception as e:
logging.error('CUSTOM_DATAFLOW_JOB_LOG: [{}] caught exception: {}'.format(command, e))
return status
def install_cmd(self):
result = []
for p in WHEEL_PACKAGES:
result.append(['gsutil', 'cp', 'gs://dataflow-execution/python_dependencies/{}'.format(p), '.'])
result.append(['pip', 'install', '{}'.format(p)])
return result
def run(self):
import logging
logging.getLogger().setLevel(logging.INFO)
try:
install_cmd = self.install_cmd()
for command in CUSTOM_COMMANDS:
status = self.run_command(command)
if status == 0:
logging.info('CUSTOM_DATAFLOW_JOB_LOG: [{}] finished successfully'.format(command))
else:
logging.error('CUSTOM_DATAFLOW_JOB_LOG: [{}] failed with status code {}'.format(command, status))
for command in install_cmd:
status = self.run_command(command)
if status == 0:
logging.info('CUSTOM_DATAFLOW_JOB_LOG: [{}] finished successfully'.format(command))
else:
logging.error('CUSTOM_DATAFLOW_JOB_LOG: [{}] failed with status code {}'.format(command, status))
except Exception as e:
logging.error('CUSTOM_DATAFLOW_JOB_LOG: [{}] caught exception: {}'.format(command, e))
REQUIRED_PACKAGES = [
]
print("======\nRunning setup.py\n==========")
setuptools.setup(
name='main_setup',
version='1.0.0',
description='DataFlow worker',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages(),
cmdclass={
'install': install,
'CustomCommands': CustomCommands,
}
)```
Upvotes: 0
Views: 1851
Reputation: 6572
Why the Subnetwork hasn't access to internet ? You can create a Router and a Gateway (Cloud NAT gateways) on Google Cloud to not expose (Dataflow) VM IP externally to internet.
The Router is created for a VPC network (your subnetwork is in this VPC) :
And the NAT Gateway is created with the previous Router.
Then it will be very easier to download your package from PyPi and setup.py file.
Example with the oracledb
package from PyPi
in the setup file :
from glob import glob
from setuptools import find_packages, setup
setup(
name="lib",
version="0.0.1",
install_requires=['oracledb==1.0.3'],
packages=find_packages(),
)
Then Dataflow
will install the package in the workers without problem.
Upvotes: 0
Reputation: 116
Have you verified that the dataflow workers definitely have access to that gcs bucket? That could be causing problems here.
In general, I believe the recommended path for this sort of thing is to use the --extra_package flag - https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/#local-or-nonpypi - you may have more luck doing that.
Upvotes: 0