Reputation: 1300
In local I have this:
from shapely.geometry import Point
<...>
class GeoDataIngestion:
def parse_method(self, string_input):
Place = Point(float(values[2]), float(values[3]))
<...>
I run this, with python 2.7 and all goes well
After that, I try to test it with the dataflow runner but while running I got this error:
NameError: global name 'Point' is not defined
The pipeline:
geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: geo_ingestion.parse_method(s))
I have read other post and I think this should work, but i'm not sure if there are something special with Google Dataflow in this
I also tried:
import shapely.geometry
<...>
Place = shapely.geometry.Point(float(values[2]), float(values[3]))
With the same result
NameError: global name 'shapely' is not defined
Any idea?
In Google Cloud, If I tried in my virtual enviroment, I can do it without any problem:
(env) ...@cloudshell:~ ()$ python
Python 2.7.13 (default, Sep 26 2018, 18:42:22)
[GCC 6.3.0 20170516] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from shapely.geometry import Point
Var = Point(-5.020751953125, 39.92237576385941)
EXTRA:
Error using requirements.txt
Collecting Shapely==1.6.4.post1 (from -r req.txt (line 2))
Using cached https://files.pythonhosted.org/packages/7d/3c/0f09841db07aabf9cc387662be646f181d07ed196e6f60ce8be5f4a8f0bd/Shapely-1.6.4.post1.tar.gz
Saved c:\<...>\shapely-1.6.4.post1.tar.gz
Complete output from command python setup.py egg_info:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "c:\<...>\temp\pip-download-kpg5ca\Shapely\setup.py", line 80, in <module>
from shapely._buildcfg import geos_version_string, geos_version, \
File "shapely\_buildcfg.py", line 200, in <module>
lgeos = CDLL("geos_c.dll")
File "C:\Python27\Lib\ctypes\__init__.py", line 366, in __init__
self._handle = _dlopen(self._name, mode)
WindowsError: [Error 126] No se puede encontrar el m¢dulo especificado
Error using setup.py
Setup.py like this changing:
CUSTOM_COMMANDS = [
['apt-get', 'update'],
['apt-get', '--assume-yes', 'install', 'libgeos-dev'],
['pip', 'install', 'Shapely'],
['echo', 'Custom command worked!']
]
The result is like no packet would be installed, because I get the error from the beginning:
NameError: global name 'Point' is not defined
setup.py file:
from __future__ import absolute_import
from __future__ import print_function
import subprocess
from distutils.command.build import build as _build
import setuptools
class build(_build): # pylint: disable=invalid-name
sub_commands = _build.sub_commands + [('CustomCommands', None)]
CUSTOM_COMMANDS = [
['apt-get', 'update'],
['apt-get', '--assume-yes', 'install', 'libgeos-dev'],
['pip', 'install', 'Shapely']]
class CustomCommands(setuptools.Command):
def initialize_options(self):
pass
def finalize_options(self):
pass
def RunCustomCommand(self, command_list):
print('Running command: %s' % command_list)
p = subprocess.Popen(
command_list,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Can use communicate(input='y\n'.encode()) if the command run requires
# some confirmation.
stdout_data, _ = p.communicate()
print('Command output: %s' % stdout_data)
if p.returncode != 0:
raise RuntimeError(
'Command %s failed: exit code: %s' % (command_list, p.returncode))
def run(self):
for command in CUSTOM_COMMANDS:
self.RunCustomCommand(command)
REQUIRED_PACKAGES = ['Shapely']
setuptools.setup(
name='dataflow',
version='0.0.1',
description='Dataflow set workflow package.',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages(),
cmdclass={
'build': build,
'CustomCommands': CustomCommands,
}
)
pipeline options:
pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(SetupOptions).setup_file = 'C:\<...>\setup.py'
with beam.Pipeline(options=pipeline_options) as p:
The call:
python -m dataflow --project XXX --temp_location gs://YYY --runner DataflowRunner --region europe-west1 --setup_file C:\<...>\setup.py
The beginning log: (before dataflow wait for the data)
INFO:root:Defaulting to the temp_location as staging_location: gs://iotbucketdetector/test/prueba
C:\Users\<...>~1\Desktop\PROYEC~2\env\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py:816: DeprecationWarning: options is deprecated since First stable release.. References to <pipeline>.options will
not be supported
transform_node.inputs[0].pipeline.options.view_as(StandardOptions))
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pipeline.pb...
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pipeline.pb
INFO:root:Executing command: ['C:\\Users\\<...>~1\\Desktop\\PROYEC~2\\env\\Scripts\\python.exe', 'setup.py', 'sdist', '--dist-dir', 'c:\\users\\<...>~1\\appdata\\local\\temp\\tmpakq8bs']
running sdist
running egg_info
writing requirements to dataflow.egg-info\requires.txt
writing dataflow.egg-info\PKG-INFO
writing top-level names to dataflow.egg-info\top_level.txt
writing dependency_links to dataflow.egg-info\dependency_links.txt
reading manifest file 'dataflow.egg-info\SOURCES.txt'
writing manifest file 'dataflow.egg-info\SOURCES.txt'
warning: sdist: standard file not found: should have one of README, README.rst, README.txt, README.md
running check
warning: check: missing required meta-data: url
warning: check: missing meta-data: either (author and author_email) or (maintainer and maintainer_email) must be supplied
creating dataflow-0.0.1
creating dataflow-0.0.1\dataflow.egg-info
copying files to dataflow-0.0.1...
copying setup.py -> dataflow-0.0.1
copying dataflow.egg-info\PKG-INFO -> dataflow-0.0.1\dataflow.egg-info
copying dataflow.egg-info\SOURCES.txt -> dataflow-0.0.1\dataflow.egg-info
copying dataflow.egg-info\dependency_links.txt -> dataflow-0.0.1\dataflow.egg-info
copying dataflow.egg-info\requires.txt -> dataflow-0.0.1\dataflow.egg-info
copying dataflow.egg-info\top_level.txt -> dataflow-0.0.1\dataflow.egg-info
Writing dataflow-0.0.1\setup.cfg
Creating tar archive
removing 'dataflow-0.0.1' (and everything under it)
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/workflow.tar.gz...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/workflow.tar.gz
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pickled_main_session...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pickled_main_session
INFO:root:Downloading source distribtution of the SDK from PyPi
INFO:root:Executing command: ['C:\\Users\\<...>~1\\Desktop\\PROYEC~2\\env\\Scripts\\python.exe', '-m', 'pip', 'download', '--dest', 'c:\\users\\<...>~1\\appdata\\local\\temp\\tmpakq8bs', 'apache-beam==2.5.0', '--no-d
eps', '--no-binary', ':all:']
Collecting apache-beam==2.5.0
Using cached https://files.pythonhosted.org/packages/c6/96/56469c57cb043f36bfdd3786c463fbaeade1e8fcf0593ec7bc7f99e56d38/apache-beam-2.5.0.zip
Saved c:\users\<...>~1\appdata\local\temp\tmpakq8bs\apache-beam-2.5.0.zip
Successfully downloaded apache-beam
INFO:root:Staging SDK sources from PyPI to gs://<...>-1120074505-586000.1542699905.588000/dataflow_python_sdk.tar
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/dataflow_python_sdk.tar...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/dataflow_python_sdk.tar
INFO:root:Downloading binary distribtution of the SDK from PyPi
INFO:root:Executing command: ['C:\\Users\\<...>~1\\Desktop\\PROYEC~2\\env\\Scripts\\python.exe', '-m', 'pip', 'download', '--dest', 'c:\\users\\<...>~1\\appdata\\local\\temp\\tmpakq8bs', 'apache-beam==2.5.0', '--no-d
eps', '--only-binary', ':all:', '--python-version', '27', '--implementation', 'cp', '--abi', 'cp27mu', '--platform', 'manylinux1_x86_64']
Collecting apache-beam==2.5.0
Using cached https://files.pythonhosted.org/packages/ff/10/a59ba412f71fb65412ec7a322de6331e19ec8e75ca45eba7a0708daae31a/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
Saved c:\users\<...>~1\appdata\local\temp\tmpakq8bs\apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
Successfully downloaded apache-beam
INFO:root:Staging binary distribution of the SDK from PyPI to gs://<...>-1120074505-586000.1542699905.588000/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
INFO:root:Create job: <Job
createTime: u'2018-11-20T07:45:28.050865Z'
currentStateTime: u'1970-01-01T00:00:00Z'
id: u'2018-11-19_23_45_27-14221834310382472741'
location: u'europe-west1'
name: u'beamapp-<...>-1120074505-586000'
projectId: u'poc-cloud-209212'
stageStates: []
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)>
Upvotes: 2
Views: 4272
Reputation: 1300
Import inside the function + setup.py:
class GeoDataIngestion:
def parse_method(self, string_input):
from shapely.geometry import Point
place = Point(float(values[2]), float(values[3]))
setup.py with:
REQUIRED_PACKAGES = ['shapely']
Upvotes: 3
Reputation: 2502
This is because you need to tell dataflow to install package you want.
Briefly documentation is here.
Simply speak, for PyPi package like shapely, you can do the following to ensure all dependencies installed.
pip freeze > requirements.txt
requirements.txt
--requirements_file requirements.txt
Or even more, if you want to do something like install linux package by apt-get
or using your own python module. Take a look on this official example. You need to setup a setup.py
for this and change your pipeline command with
--setup_file setup.py
.
For PyPi module, use the REQUIRED_PACKAGES
in example.
REQUIRED_PACKAGES = [
'numpy','shapely'
]
If you are use pipeline options, then add setup.py as
pipeline_options = {
'project': PROJECT,
'staging_location': 'gs://' + BUCKET + '/staging',
'runner': 'DataflowRunner',
'job_name': 'test',
'temp_location': 'gs://' + BUCKET + '/temp',
'save_main_session': True,
'setup_file':'.\setup.py'
}
options = PipelineOptions.from_dictionary(pipeline_options)
p = beam.Pipeline(options=options)
Upvotes: 4