Google Dataflow: global name is not defined - apache beam





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}







1















In local I have this:



from shapely.geometry import Point
<...>
class GeoDataIngestion:
def parse_method(self, string_input):
Place = Point(float(values[2]), float(values[3]))
<...>


I run this, with python 2.7 and all goes well



After that, I try to test it with the dataflow runner but while running I got this error:



NameError: global name 'Point' is not defined




The pipeline:



geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: geo_ingestion.parse_method(s))




I have read other post and I think this should work, but i'm not sure if there are something special with Google Dataflow in this



I also tried:



import shapely.geometry
<...>
Place = shapely.geometry.Point(float(values[2]), float(values[3]))


With the same result



NameError: global name 'shapely' is not defined


Any idea?





In Google Cloud, If I tried in my virtual enviroment, I can do it without any problem:



(env) ...@cloudshell:~ ()$ python
Python 2.7.13 (default, Sep 26 2018, 18:42:22)
[GCC 6.3.0 20170516] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from shapely.geometry import Point
Var = Point(-5.020751953125, 39.92237576385941)




EXTRA:





Error using requirements.txt



Collecting Shapely==1.6.4.post1 (from -r req.txt (line 2))
Using cached https://files.pythonhosted.org/packages/7d/3c/0f09841db07aabf9cc387662be646f181d07ed196e6f60ce8be5f4a8f0bd/Shapely-1.6.4.post1.tar.gz
Saved c:<...>shapely-1.6.4.post1.tar.gz
Complete output from command python setup.py egg_info:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "c:<...>temppip-download-kpg5caShapelysetup.py", line 80, in <module>
from shapely._buildcfg import geos_version_string, geos_version,
File "shapely_buildcfg.py", line 200, in <module>
lgeos = CDLL("geos_c.dll")
File "C:Python27Libctypes__init__.py", line 366, in __init__
self._handle = _dlopen(self._name, mode)
WindowsError: [Error 126] No se puede encontrar el m¢dulo especificado




Error using setup.py



Setup.py like this changing:



CUSTOM_COMMANDS = [
['apt-get', 'update'],
['apt-get', '--assume-yes', 'install', 'libgeos-dev'],
['pip', 'install', 'Shapely'],
['echo', 'Custom command worked!']
]


The result is like no packet would be installed, because I get the error from the beginning:



NameError: global name 'Point' is not defined




setup.py file:



from __future__ import absolute_import
from __future__ import print_function
import subprocess
from distutils.command.build import build as _build
import setuptools


class build(_build): # pylint: disable=invalid-name
sub_commands = _build.sub_commands + [('CustomCommands', None)]
CUSTOM_COMMANDS = [
['apt-get', 'update'],
['apt-get', '--assume-yes', 'install', 'libgeos-dev'],
['pip', 'install', 'Shapely']]


class CustomCommands(setuptools.Command):
def initialize_options(self):
pass

def finalize_options(self):
pass

def RunCustomCommand(self, command_list):
print('Running command: %s' % command_list)
p = subprocess.Popen(
command_list,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Can use communicate(input='yn'.encode()) if the command run requires
# some confirmation.
stdout_data, _ = p.communicate()
print('Command output: %s' % stdout_data)
if p.returncode != 0:
raise RuntimeError(
'Command %s failed: exit code: %s' % (command_list, p.returncode))

def run(self):
for command in CUSTOM_COMMANDS:
self.RunCustomCommand(command)

REQUIRED_PACKAGES = ['Shapely']

setuptools.setup(
name='dataflow',
version='0.0.1',
description='Dataflow set workflow package.',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages(),
cmdclass={
'build': build,
'CustomCommands': CustomCommands,
}
)




pipeline options:



 pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(SetupOptions).setup_file = 'C:<...>setup.py'

with beam.Pipeline(options=pipeline_options) as p:




The call:



python -m dataflow --project XXX --temp_location gs://YYY --runner DataflowRunner --region europe-west1 --setup_file C:<...>setup.py




The beginning log: (before dataflow wait for the data)



INFO:root:Defaulting to the temp_location as staging_location: gs://iotbucketdetector/test/prueba
C:Users<...>~1DesktopPROYEC~2envlibsite-packagesapache_beamrunnersdataflowdataflow_runner.py:816: DeprecationWarning: options is deprecated since First stable release.. References to <pipeline>.options will
not be supported
transform_node.inputs[0].pipeline.options.view_as(StandardOptions))
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pipeline.pb...
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pipeline.pb
INFO:root:Executing command: ['C:\Users\<...>~1\Desktop\PROYEC~2\env\Scripts\python.exe', 'setup.py', 'sdist', '--dist-dir', 'c:\users\<...>~1\appdata\local\temp\tmpakq8bs']
running sdist
running egg_info
writing requirements to dataflow.egg-inforequires.txt
writing dataflow.egg-infoPKG-INFO
writing top-level names to dataflow.egg-infotop_level.txt
writing dependency_links to dataflow.egg-infodependency_links.txt
reading manifest file 'dataflow.egg-infoSOURCES.txt'
writing manifest file 'dataflow.egg-infoSOURCES.txt'
warning: sdist: standard file not found: should have one of README, README.rst, README.txt, README.md

running check
warning: check: missing required meta-data: url

warning: check: missing meta-data: either (author and author_email) or (maintainer and maintainer_email) must be supplied

creating dataflow-0.0.1
creating dataflow-0.0.1dataflow.egg-info
copying files to dataflow-0.0.1...
copying setup.py -> dataflow-0.0.1
copying dataflow.egg-infoPKG-INFO -> dataflow-0.0.1dataflow.egg-info
copying dataflow.egg-infoSOURCES.txt -> dataflow-0.0.1dataflow.egg-info
copying dataflow.egg-infodependency_links.txt -> dataflow-0.0.1dataflow.egg-info
copying dataflow.egg-inforequires.txt -> dataflow-0.0.1dataflow.egg-info
copying dataflow.egg-infotop_level.txt -> dataflow-0.0.1dataflow.egg-info
Writing dataflow-0.0.1setup.cfg
Creating tar archive
removing 'dataflow-0.0.1' (and everything under it)
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/workflow.tar.gz...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/workflow.tar.gz
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pickled_main_session...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pickled_main_session
INFO:root:Downloading source distribtution of the SDK from PyPi
INFO:root:Executing command: ['C:\Users\<...>~1\Desktop\PROYEC~2\env\Scripts\python.exe', '-m', 'pip', 'download', '--dest', 'c:\users\<...>~1\appdata\local\temp\tmpakq8bs', 'apache-beam==2.5.0', '--no-d
eps', '--no-binary', ':all:']
Collecting apache-beam==2.5.0
Using cached https://files.pythonhosted.org/packages/c6/96/56469c57cb043f36bfdd3786c463fbaeade1e8fcf0593ec7bc7f99e56d38/apache-beam-2.5.0.zip
Saved c:users<...>~1appdatalocaltemptmpakq8bsapache-beam-2.5.0.zip
Successfully downloaded apache-beam
INFO:root:Staging SDK sources from PyPI to gs://<...>-1120074505-586000.1542699905.588000/dataflow_python_sdk.tar
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/dataflow_python_sdk.tar...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/dataflow_python_sdk.tar
INFO:root:Downloading binary distribtution of the SDK from PyPi
INFO:root:Executing command: ['C:\Users\<...>~1\Desktop\PROYEC~2\env\Scripts\python.exe', '-m', 'pip', 'download', '--dest', 'c:\users\<...>~1\appdata\local\temp\tmpakq8bs', 'apache-beam==2.5.0', '--no-d
eps', '--only-binary', ':all:', '--python-version', '27', '--implementation', 'cp', '--abi', 'cp27mu', '--platform', 'manylinux1_x86_64']
Collecting apache-beam==2.5.0
Using cached https://files.pythonhosted.org/packages/ff/10/a59ba412f71fb65412ec7a322de6331e19ec8e75ca45eba7a0708daae31a/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
Saved c:users<...>~1appdatalocaltemptmpakq8bsapache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
Successfully downloaded apache-beam
INFO:root:Staging binary distribution of the SDK from PyPI to gs://<...>-1120074505-586000.1542699905.588000/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
INFO:root:Create job: <Job
createTime: u'2018-11-20T07:45:28.050865Z'
currentStateTime: u'1970-01-01T00:00:00Z'
id: u'2018-11-19_23_45_27-14221834310382472741'
location: u'europe-west1'
name: u'beamapp-<...>-1120074505-586000'
projectId: u'poc-cloud-209212'
stageStates:
steps:
tempFiles:
type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)>









share|improve this question

























  • Are you using save_main_session = True for global context as in this example?

    – Guillem Xercavins
    Nov 16 '18 at 16:39











  • Yes: pipeline_options.view_as(SetupOptions).save_main_session = True and pipeline_options.view_as(StandardOptions).streaming = True

    – IoT user
    Nov 19 '18 at 7:57


















1















In local I have this:



from shapely.geometry import Point
<...>
class GeoDataIngestion:
def parse_method(self, string_input):
Place = Point(float(values[2]), float(values[3]))
<...>


I run this, with python 2.7 and all goes well



After that, I try to test it with the dataflow runner but while running I got this error:



NameError: global name 'Point' is not defined




The pipeline:



geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: geo_ingestion.parse_method(s))




I have read other post and I think this should work, but i'm not sure if there are something special with Google Dataflow in this



I also tried:



import shapely.geometry
<...>
Place = shapely.geometry.Point(float(values[2]), float(values[3]))


With the same result



NameError: global name 'shapely' is not defined


Any idea?





In Google Cloud, If I tried in my virtual enviroment, I can do it without any problem:



(env) ...@cloudshell:~ ()$ python
Python 2.7.13 (default, Sep 26 2018, 18:42:22)
[GCC 6.3.0 20170516] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from shapely.geometry import Point
Var = Point(-5.020751953125, 39.92237576385941)




EXTRA:





Error using requirements.txt



Collecting Shapely==1.6.4.post1 (from -r req.txt (line 2))
Using cached https://files.pythonhosted.org/packages/7d/3c/0f09841db07aabf9cc387662be646f181d07ed196e6f60ce8be5f4a8f0bd/Shapely-1.6.4.post1.tar.gz
Saved c:<...>shapely-1.6.4.post1.tar.gz
Complete output from command python setup.py egg_info:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "c:<...>temppip-download-kpg5caShapelysetup.py", line 80, in <module>
from shapely._buildcfg import geos_version_string, geos_version,
File "shapely_buildcfg.py", line 200, in <module>
lgeos = CDLL("geos_c.dll")
File "C:Python27Libctypes__init__.py", line 366, in __init__
self._handle = _dlopen(self._name, mode)
WindowsError: [Error 126] No se puede encontrar el m¢dulo especificado




Error using setup.py



Setup.py like this changing:



CUSTOM_COMMANDS = [
['apt-get', 'update'],
['apt-get', '--assume-yes', 'install', 'libgeos-dev'],
['pip', 'install', 'Shapely'],
['echo', 'Custom command worked!']
]


The result is like no packet would be installed, because I get the error from the beginning:



NameError: global name 'Point' is not defined




setup.py file:



from __future__ import absolute_import
from __future__ import print_function
import subprocess
from distutils.command.build import build as _build
import setuptools


class build(_build): # pylint: disable=invalid-name
sub_commands = _build.sub_commands + [('CustomCommands', None)]
CUSTOM_COMMANDS = [
['apt-get', 'update'],
['apt-get', '--assume-yes', 'install', 'libgeos-dev'],
['pip', 'install', 'Shapely']]


class CustomCommands(setuptools.Command):
def initialize_options(self):
pass

def finalize_options(self):
pass

def RunCustomCommand(self, command_list):
print('Running command: %s' % command_list)
p = subprocess.Popen(
command_list,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Can use communicate(input='yn'.encode()) if the command run requires
# some confirmation.
stdout_data, _ = p.communicate()
print('Command output: %s' % stdout_data)
if p.returncode != 0:
raise RuntimeError(
'Command %s failed: exit code: %s' % (command_list, p.returncode))

def run(self):
for command in CUSTOM_COMMANDS:
self.RunCustomCommand(command)

REQUIRED_PACKAGES = ['Shapely']

setuptools.setup(
name='dataflow',
version='0.0.1',
description='Dataflow set workflow package.',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages(),
cmdclass={
'build': build,
'CustomCommands': CustomCommands,
}
)




pipeline options:



 pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(SetupOptions).setup_file = 'C:<...>setup.py'

with beam.Pipeline(options=pipeline_options) as p:




The call:



python -m dataflow --project XXX --temp_location gs://YYY --runner DataflowRunner --region europe-west1 --setup_file C:<...>setup.py




The beginning log: (before dataflow wait for the data)



INFO:root:Defaulting to the temp_location as staging_location: gs://iotbucketdetector/test/prueba
C:Users<...>~1DesktopPROYEC~2envlibsite-packagesapache_beamrunnersdataflowdataflow_runner.py:816: DeprecationWarning: options is deprecated since First stable release.. References to <pipeline>.options will
not be supported
transform_node.inputs[0].pipeline.options.view_as(StandardOptions))
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pipeline.pb...
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pipeline.pb
INFO:root:Executing command: ['C:\Users\<...>~1\Desktop\PROYEC~2\env\Scripts\python.exe', 'setup.py', 'sdist', '--dist-dir', 'c:\users\<...>~1\appdata\local\temp\tmpakq8bs']
running sdist
running egg_info
writing requirements to dataflow.egg-inforequires.txt
writing dataflow.egg-infoPKG-INFO
writing top-level names to dataflow.egg-infotop_level.txt
writing dependency_links to dataflow.egg-infodependency_links.txt
reading manifest file 'dataflow.egg-infoSOURCES.txt'
writing manifest file 'dataflow.egg-infoSOURCES.txt'
warning: sdist: standard file not found: should have one of README, README.rst, README.txt, README.md

running check
warning: check: missing required meta-data: url

warning: check: missing meta-data: either (author and author_email) or (maintainer and maintainer_email) must be supplied

creating dataflow-0.0.1
creating dataflow-0.0.1dataflow.egg-info
copying files to dataflow-0.0.1...
copying setup.py -> dataflow-0.0.1
copying dataflow.egg-infoPKG-INFO -> dataflow-0.0.1dataflow.egg-info
copying dataflow.egg-infoSOURCES.txt -> dataflow-0.0.1dataflow.egg-info
copying dataflow.egg-infodependency_links.txt -> dataflow-0.0.1dataflow.egg-info
copying dataflow.egg-inforequires.txt -> dataflow-0.0.1dataflow.egg-info
copying dataflow.egg-infotop_level.txt -> dataflow-0.0.1dataflow.egg-info
Writing dataflow-0.0.1setup.cfg
Creating tar archive
removing 'dataflow-0.0.1' (and everything under it)
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/workflow.tar.gz...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/workflow.tar.gz
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pickled_main_session...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pickled_main_session
INFO:root:Downloading source distribtution of the SDK from PyPi
INFO:root:Executing command: ['C:\Users\<...>~1\Desktop\PROYEC~2\env\Scripts\python.exe', '-m', 'pip', 'download', '--dest', 'c:\users\<...>~1\appdata\local\temp\tmpakq8bs', 'apache-beam==2.5.0', '--no-d
eps', '--no-binary', ':all:']
Collecting apache-beam==2.5.0
Using cached https://files.pythonhosted.org/packages/c6/96/56469c57cb043f36bfdd3786c463fbaeade1e8fcf0593ec7bc7f99e56d38/apache-beam-2.5.0.zip
Saved c:users<...>~1appdatalocaltemptmpakq8bsapache-beam-2.5.0.zip
Successfully downloaded apache-beam
INFO:root:Staging SDK sources from PyPI to gs://<...>-1120074505-586000.1542699905.588000/dataflow_python_sdk.tar
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/dataflow_python_sdk.tar...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/dataflow_python_sdk.tar
INFO:root:Downloading binary distribtution of the SDK from PyPi
INFO:root:Executing command: ['C:\Users\<...>~1\Desktop\PROYEC~2\env\Scripts\python.exe', '-m', 'pip', 'download', '--dest', 'c:\users\<...>~1\appdata\local\temp\tmpakq8bs', 'apache-beam==2.5.0', '--no-d
eps', '--only-binary', ':all:', '--python-version', '27', '--implementation', 'cp', '--abi', 'cp27mu', '--platform', 'manylinux1_x86_64']
Collecting apache-beam==2.5.0
Using cached https://files.pythonhosted.org/packages/ff/10/a59ba412f71fb65412ec7a322de6331e19ec8e75ca45eba7a0708daae31a/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
Saved c:users<...>~1appdatalocaltemptmpakq8bsapache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
Successfully downloaded apache-beam
INFO:root:Staging binary distribution of the SDK from PyPI to gs://<...>-1120074505-586000.1542699905.588000/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
INFO:root:Create job: <Job
createTime: u'2018-11-20T07:45:28.050865Z'
currentStateTime: u'1970-01-01T00:00:00Z'
id: u'2018-11-19_23_45_27-14221834310382472741'
location: u'europe-west1'
name: u'beamapp-<...>-1120074505-586000'
projectId: u'poc-cloud-209212'
stageStates:
steps:
tempFiles:
type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)>









share|improve this question

























  • Are you using save_main_session = True for global context as in this example?

    – Guillem Xercavins
    Nov 16 '18 at 16:39











  • Yes: pipeline_options.view_as(SetupOptions).save_main_session = True and pipeline_options.view_as(StandardOptions).streaming = True

    – IoT user
    Nov 19 '18 at 7:57














1












1








1








In local I have this:



from shapely.geometry import Point
<...>
class GeoDataIngestion:
def parse_method(self, string_input):
Place = Point(float(values[2]), float(values[3]))
<...>


I run this, with python 2.7 and all goes well



After that, I try to test it with the dataflow runner but while running I got this error:



NameError: global name 'Point' is not defined




The pipeline:



geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: geo_ingestion.parse_method(s))




I have read other post and I think this should work, but i'm not sure if there are something special with Google Dataflow in this



I also tried:



import shapely.geometry
<...>
Place = shapely.geometry.Point(float(values[2]), float(values[3]))


With the same result



NameError: global name 'shapely' is not defined


Any idea?





In Google Cloud, If I tried in my virtual enviroment, I can do it without any problem:



(env) ...@cloudshell:~ ()$ python
Python 2.7.13 (default, Sep 26 2018, 18:42:22)
[GCC 6.3.0 20170516] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from shapely.geometry import Point
Var = Point(-5.020751953125, 39.92237576385941)




EXTRA:





Error using requirements.txt



Collecting Shapely==1.6.4.post1 (from -r req.txt (line 2))
Using cached https://files.pythonhosted.org/packages/7d/3c/0f09841db07aabf9cc387662be646f181d07ed196e6f60ce8be5f4a8f0bd/Shapely-1.6.4.post1.tar.gz
Saved c:<...>shapely-1.6.4.post1.tar.gz
Complete output from command python setup.py egg_info:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "c:<...>temppip-download-kpg5caShapelysetup.py", line 80, in <module>
from shapely._buildcfg import geos_version_string, geos_version,
File "shapely_buildcfg.py", line 200, in <module>
lgeos = CDLL("geos_c.dll")
File "C:Python27Libctypes__init__.py", line 366, in __init__
self._handle = _dlopen(self._name, mode)
WindowsError: [Error 126] No se puede encontrar el m¢dulo especificado




Error using setup.py



Setup.py like this changing:



CUSTOM_COMMANDS = [
['apt-get', 'update'],
['apt-get', '--assume-yes', 'install', 'libgeos-dev'],
['pip', 'install', 'Shapely'],
['echo', 'Custom command worked!']
]


The result is like no packet would be installed, because I get the error from the beginning:



NameError: global name 'Point' is not defined




setup.py file:



from __future__ import absolute_import
from __future__ import print_function
import subprocess
from distutils.command.build import build as _build
import setuptools


class build(_build): # pylint: disable=invalid-name
sub_commands = _build.sub_commands + [('CustomCommands', None)]
CUSTOM_COMMANDS = [
['apt-get', 'update'],
['apt-get', '--assume-yes', 'install', 'libgeos-dev'],
['pip', 'install', 'Shapely']]


class CustomCommands(setuptools.Command):
def initialize_options(self):
pass

def finalize_options(self):
pass

def RunCustomCommand(self, command_list):
print('Running command: %s' % command_list)
p = subprocess.Popen(
command_list,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Can use communicate(input='yn'.encode()) if the command run requires
# some confirmation.
stdout_data, _ = p.communicate()
print('Command output: %s' % stdout_data)
if p.returncode != 0:
raise RuntimeError(
'Command %s failed: exit code: %s' % (command_list, p.returncode))

def run(self):
for command in CUSTOM_COMMANDS:
self.RunCustomCommand(command)

REQUIRED_PACKAGES = ['Shapely']

setuptools.setup(
name='dataflow',
version='0.0.1',
description='Dataflow set workflow package.',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages(),
cmdclass={
'build': build,
'CustomCommands': CustomCommands,
}
)




pipeline options:



 pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(SetupOptions).setup_file = 'C:<...>setup.py'

with beam.Pipeline(options=pipeline_options) as p:




The call:



python -m dataflow --project XXX --temp_location gs://YYY --runner DataflowRunner --region europe-west1 --setup_file C:<...>setup.py




The beginning log: (before dataflow wait for the data)



INFO:root:Defaulting to the temp_location as staging_location: gs://iotbucketdetector/test/prueba
C:Users<...>~1DesktopPROYEC~2envlibsite-packagesapache_beamrunnersdataflowdataflow_runner.py:816: DeprecationWarning: options is deprecated since First stable release.. References to <pipeline>.options will
not be supported
transform_node.inputs[0].pipeline.options.view_as(StandardOptions))
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pipeline.pb...
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pipeline.pb
INFO:root:Executing command: ['C:\Users\<...>~1\Desktop\PROYEC~2\env\Scripts\python.exe', 'setup.py', 'sdist', '--dist-dir', 'c:\users\<...>~1\appdata\local\temp\tmpakq8bs']
running sdist
running egg_info
writing requirements to dataflow.egg-inforequires.txt
writing dataflow.egg-infoPKG-INFO
writing top-level names to dataflow.egg-infotop_level.txt
writing dependency_links to dataflow.egg-infodependency_links.txt
reading manifest file 'dataflow.egg-infoSOURCES.txt'
writing manifest file 'dataflow.egg-infoSOURCES.txt'
warning: sdist: standard file not found: should have one of README, README.rst, README.txt, README.md

running check
warning: check: missing required meta-data: url

warning: check: missing meta-data: either (author and author_email) or (maintainer and maintainer_email) must be supplied

creating dataflow-0.0.1
creating dataflow-0.0.1dataflow.egg-info
copying files to dataflow-0.0.1...
copying setup.py -> dataflow-0.0.1
copying dataflow.egg-infoPKG-INFO -> dataflow-0.0.1dataflow.egg-info
copying dataflow.egg-infoSOURCES.txt -> dataflow-0.0.1dataflow.egg-info
copying dataflow.egg-infodependency_links.txt -> dataflow-0.0.1dataflow.egg-info
copying dataflow.egg-inforequires.txt -> dataflow-0.0.1dataflow.egg-info
copying dataflow.egg-infotop_level.txt -> dataflow-0.0.1dataflow.egg-info
Writing dataflow-0.0.1setup.cfg
Creating tar archive
removing 'dataflow-0.0.1' (and everything under it)
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/workflow.tar.gz...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/workflow.tar.gz
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pickled_main_session...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pickled_main_session
INFO:root:Downloading source distribtution of the SDK from PyPi
INFO:root:Executing command: ['C:\Users\<...>~1\Desktop\PROYEC~2\env\Scripts\python.exe', '-m', 'pip', 'download', '--dest', 'c:\users\<...>~1\appdata\local\temp\tmpakq8bs', 'apache-beam==2.5.0', '--no-d
eps', '--no-binary', ':all:']
Collecting apache-beam==2.5.0
Using cached https://files.pythonhosted.org/packages/c6/96/56469c57cb043f36bfdd3786c463fbaeade1e8fcf0593ec7bc7f99e56d38/apache-beam-2.5.0.zip
Saved c:users<...>~1appdatalocaltemptmpakq8bsapache-beam-2.5.0.zip
Successfully downloaded apache-beam
INFO:root:Staging SDK sources from PyPI to gs://<...>-1120074505-586000.1542699905.588000/dataflow_python_sdk.tar
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/dataflow_python_sdk.tar...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/dataflow_python_sdk.tar
INFO:root:Downloading binary distribtution of the SDK from PyPi
INFO:root:Executing command: ['C:\Users\<...>~1\Desktop\PROYEC~2\env\Scripts\python.exe', '-m', 'pip', 'download', '--dest', 'c:\users\<...>~1\appdata\local\temp\tmpakq8bs', 'apache-beam==2.5.0', '--no-d
eps', '--only-binary', ':all:', '--python-version', '27', '--implementation', 'cp', '--abi', 'cp27mu', '--platform', 'manylinux1_x86_64']
Collecting apache-beam==2.5.0
Using cached https://files.pythonhosted.org/packages/ff/10/a59ba412f71fb65412ec7a322de6331e19ec8e75ca45eba7a0708daae31a/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
Saved c:users<...>~1appdatalocaltemptmpakq8bsapache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
Successfully downloaded apache-beam
INFO:root:Staging binary distribution of the SDK from PyPI to gs://<...>-1120074505-586000.1542699905.588000/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
INFO:root:Create job: <Job
createTime: u'2018-11-20T07:45:28.050865Z'
currentStateTime: u'1970-01-01T00:00:00Z'
id: u'2018-11-19_23_45_27-14221834310382472741'
location: u'europe-west1'
name: u'beamapp-<...>-1120074505-586000'
projectId: u'poc-cloud-209212'
stageStates:
steps:
tempFiles:
type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)>









share|improve this question
















In local I have this:



from shapely.geometry import Point
<...>
class GeoDataIngestion:
def parse_method(self, string_input):
Place = Point(float(values[2]), float(values[3]))
<...>


I run this, with python 2.7 and all goes well



After that, I try to test it with the dataflow runner but while running I got this error:



NameError: global name 'Point' is not defined




The pipeline:



geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: geo_ingestion.parse_method(s))




I have read other post and I think this should work, but i'm not sure if there are something special with Google Dataflow in this



I also tried:



import shapely.geometry
<...>
Place = shapely.geometry.Point(float(values[2]), float(values[3]))


With the same result



NameError: global name 'shapely' is not defined


Any idea?





In Google Cloud, If I tried in my virtual enviroment, I can do it without any problem:



(env) ...@cloudshell:~ ()$ python
Python 2.7.13 (default, Sep 26 2018, 18:42:22)
[GCC 6.3.0 20170516] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from shapely.geometry import Point
Var = Point(-5.020751953125, 39.92237576385941)




EXTRA:





Error using requirements.txt



Collecting Shapely==1.6.4.post1 (from -r req.txt (line 2))
Using cached https://files.pythonhosted.org/packages/7d/3c/0f09841db07aabf9cc387662be646f181d07ed196e6f60ce8be5f4a8f0bd/Shapely-1.6.4.post1.tar.gz
Saved c:<...>shapely-1.6.4.post1.tar.gz
Complete output from command python setup.py egg_info:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "c:<...>temppip-download-kpg5caShapelysetup.py", line 80, in <module>
from shapely._buildcfg import geos_version_string, geos_version,
File "shapely_buildcfg.py", line 200, in <module>
lgeos = CDLL("geos_c.dll")
File "C:Python27Libctypes__init__.py", line 366, in __init__
self._handle = _dlopen(self._name, mode)
WindowsError: [Error 126] No se puede encontrar el m¢dulo especificado




Error using setup.py



Setup.py like this changing:



CUSTOM_COMMANDS = [
['apt-get', 'update'],
['apt-get', '--assume-yes', 'install', 'libgeos-dev'],
['pip', 'install', 'Shapely'],
['echo', 'Custom command worked!']
]


The result is like no packet would be installed, because I get the error from the beginning:



NameError: global name 'Point' is not defined




setup.py file:



from __future__ import absolute_import
from __future__ import print_function
import subprocess
from distutils.command.build import build as _build
import setuptools


class build(_build): # pylint: disable=invalid-name
sub_commands = _build.sub_commands + [('CustomCommands', None)]
CUSTOM_COMMANDS = [
['apt-get', 'update'],
['apt-get', '--assume-yes', 'install', 'libgeos-dev'],
['pip', 'install', 'Shapely']]


class CustomCommands(setuptools.Command):
def initialize_options(self):
pass

def finalize_options(self):
pass

def RunCustomCommand(self, command_list):
print('Running command: %s' % command_list)
p = subprocess.Popen(
command_list,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Can use communicate(input='yn'.encode()) if the command run requires
# some confirmation.
stdout_data, _ = p.communicate()
print('Command output: %s' % stdout_data)
if p.returncode != 0:
raise RuntimeError(
'Command %s failed: exit code: %s' % (command_list, p.returncode))

def run(self):
for command in CUSTOM_COMMANDS:
self.RunCustomCommand(command)

REQUIRED_PACKAGES = ['Shapely']

setuptools.setup(
name='dataflow',
version='0.0.1',
description='Dataflow set workflow package.',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages(),
cmdclass={
'build': build,
'CustomCommands': CustomCommands,
}
)




pipeline options:



 pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(SetupOptions).setup_file = 'C:<...>setup.py'

with beam.Pipeline(options=pipeline_options) as p:




The call:



python -m dataflow --project XXX --temp_location gs://YYY --runner DataflowRunner --region europe-west1 --setup_file C:<...>setup.py




The beginning log: (before dataflow wait for the data)



INFO:root:Defaulting to the temp_location as staging_location: gs://iotbucketdetector/test/prueba
C:Users<...>~1DesktopPROYEC~2envlibsite-packagesapache_beamrunnersdataflowdataflow_runner.py:816: DeprecationWarning: options is deprecated since First stable release.. References to <pipeline>.options will
not be supported
transform_node.inputs[0].pipeline.options.view_as(StandardOptions))
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pipeline.pb...
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pipeline.pb
INFO:root:Executing command: ['C:\Users\<...>~1\Desktop\PROYEC~2\env\Scripts\python.exe', 'setup.py', 'sdist', '--dist-dir', 'c:\users\<...>~1\appdata\local\temp\tmpakq8bs']
running sdist
running egg_info
writing requirements to dataflow.egg-inforequires.txt
writing dataflow.egg-infoPKG-INFO
writing top-level names to dataflow.egg-infotop_level.txt
writing dependency_links to dataflow.egg-infodependency_links.txt
reading manifest file 'dataflow.egg-infoSOURCES.txt'
writing manifest file 'dataflow.egg-infoSOURCES.txt'
warning: sdist: standard file not found: should have one of README, README.rst, README.txt, README.md

running check
warning: check: missing required meta-data: url

warning: check: missing meta-data: either (author and author_email) or (maintainer and maintainer_email) must be supplied

creating dataflow-0.0.1
creating dataflow-0.0.1dataflow.egg-info
copying files to dataflow-0.0.1...
copying setup.py -> dataflow-0.0.1
copying dataflow.egg-infoPKG-INFO -> dataflow-0.0.1dataflow.egg-info
copying dataflow.egg-infoSOURCES.txt -> dataflow-0.0.1dataflow.egg-info
copying dataflow.egg-infodependency_links.txt -> dataflow-0.0.1dataflow.egg-info
copying dataflow.egg-inforequires.txt -> dataflow-0.0.1dataflow.egg-info
copying dataflow.egg-infotop_level.txt -> dataflow-0.0.1dataflow.egg-info
Writing dataflow-0.0.1setup.cfg
Creating tar archive
removing 'dataflow-0.0.1' (and everything under it)
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/workflow.tar.gz...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/workflow.tar.gz
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pickled_main_session...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pickled_main_session
INFO:root:Downloading source distribtution of the SDK from PyPi
INFO:root:Executing command: ['C:\Users\<...>~1\Desktop\PROYEC~2\env\Scripts\python.exe', '-m', 'pip', 'download', '--dest', 'c:\users\<...>~1\appdata\local\temp\tmpakq8bs', 'apache-beam==2.5.0', '--no-d
eps', '--no-binary', ':all:']
Collecting apache-beam==2.5.0
Using cached https://files.pythonhosted.org/packages/c6/96/56469c57cb043f36bfdd3786c463fbaeade1e8fcf0593ec7bc7f99e56d38/apache-beam-2.5.0.zip
Saved c:users<...>~1appdatalocaltemptmpakq8bsapache-beam-2.5.0.zip
Successfully downloaded apache-beam
INFO:root:Staging SDK sources from PyPI to gs://<...>-1120074505-586000.1542699905.588000/dataflow_python_sdk.tar
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/dataflow_python_sdk.tar...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/dataflow_python_sdk.tar
INFO:root:Downloading binary distribtution of the SDK from PyPi
INFO:root:Executing command: ['C:\Users\<...>~1\Desktop\PROYEC~2\env\Scripts\python.exe', '-m', 'pip', 'download', '--dest', 'c:\users\<...>~1\appdata\local\temp\tmpakq8bs', 'apache-beam==2.5.0', '--no-d
eps', '--only-binary', ':all:', '--python-version', '27', '--implementation', 'cp', '--abi', 'cp27mu', '--platform', 'manylinux1_x86_64']
Collecting apache-beam==2.5.0
Using cached https://files.pythonhosted.org/packages/ff/10/a59ba412f71fb65412ec7a322de6331e19ec8e75ca45eba7a0708daae31a/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
Saved c:users<...>~1appdatalocaltemptmpakq8bsapache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
Successfully downloaded apache-beam
INFO:root:Staging binary distribution of the SDK from PyPI to gs://<...>-1120074505-586000.1542699905.588000/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
INFO:root:Create job: <Job
createTime: u'2018-11-20T07:45:28.050865Z'
currentStateTime: u'1970-01-01T00:00:00Z'
id: u'2018-11-19_23_45_27-14221834310382472741'
location: u'europe-west1'
name: u'beamapp-<...>-1120074505-586000'
projectId: u'poc-cloud-209212'
stageStates:
steps:
tempFiles:
type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)>






python google-cloud-dataflow apache-beam shapely.geometry






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 20 '18 at 8:03







IoT user

















asked Nov 16 '18 at 12:21









IoT userIoT user

15912




15912













  • Are you using save_main_session = True for global context as in this example?

    – Guillem Xercavins
    Nov 16 '18 at 16:39











  • Yes: pipeline_options.view_as(SetupOptions).save_main_session = True and pipeline_options.view_as(StandardOptions).streaming = True

    – IoT user
    Nov 19 '18 at 7:57



















  • Are you using save_main_session = True for global context as in this example?

    – Guillem Xercavins
    Nov 16 '18 at 16:39











  • Yes: pipeline_options.view_as(SetupOptions).save_main_session = True and pipeline_options.view_as(StandardOptions).streaming = True

    – IoT user
    Nov 19 '18 at 7:57

















Are you using save_main_session = True for global context as in this example?

– Guillem Xercavins
Nov 16 '18 at 16:39





Are you using save_main_session = True for global context as in this example?

– Guillem Xercavins
Nov 16 '18 at 16:39













Yes: pipeline_options.view_as(SetupOptions).save_main_session = True and pipeline_options.view_as(StandardOptions).streaming = True

– IoT user
Nov 19 '18 at 7:57





Yes: pipeline_options.view_as(SetupOptions).save_main_session = True and pipeline_options.view_as(StandardOptions).streaming = True

– IoT user
Nov 19 '18 at 7:57












2 Answers
2






active

oldest

votes


















1














This is because you need to tell dataflow to install package you want.



Briefly documentation is here.



Simply speak, for PyPi package like shapely, you can do the following to ensure all dependencies installed.




  1. pip freeze > requirements.txt

  2. Remove all unrelated package in requirements.txt

  3. Run your pipline with --requirements_file requirements.txt


Or even more, if you want to do something like install linux package by apt-get or using your own python module. Take a look on this official example. You need to setup a setup.py for this and change your pipeline command with
--setup_file setup.py.



For PyPi module, use the REQUIRED_PACKAGES in example.



REQUIRED_PACKAGES = [
'numpy','shapely'
]


If you are use pipeline options, then add setup.py as



pipeline_options = {
'project': PROJECT,
'staging_location': 'gs://' + BUCKET + '/staging',
'runner': 'DataflowRunner',
'job_name': 'test',
'temp_location': 'gs://' + BUCKET + '/temp',
'save_main_session': True,
'setup_file':'.setup.py'
}
options = PipelineOptions.from_dictionary(pipeline_options)
p = beam.Pipeline(options=options)





share|improve this answer


























  • Thanks! I tried with requirements.txt, but I had a problem(added to the main post). So according to This I think I have to uso the setup.py to use apt-get, isn't it? What change should I do in the example? Edit CUSTOM_COMMANDS = [ ['echo', 'Custom command worked!']]?

    – IoT user
    Nov 19 '18 at 10:18













  • That's strange, though I use the REQUIRED_PACKAGES (as edited), I can manage to install required pip package, have you add setup.py into pipeline option?

    – MatrixTai
    Nov 20 '18 at 3:30











  • Quite strange, because even if I add that PipelineOptions with setup_file is like what I wrote in setup.py has no effect. I attach more information to see what could be wrong in what i'm doing

    – IoT user
    Nov 20 '18 at 8:05











  • @IoTuser, I will make a test case for shapely.

    – MatrixTai
    Nov 21 '18 at 1:28











  • I will wait for your result and hope you can make it! Thanks!

    – IoT user
    Nov 21 '18 at 7:39



















0














Import inside the function + setup.py:



class GeoDataIngestion:
def parse_method(self, string_input):
from shapely.geometry import Point
place = Point(float(values[2]), float(values[3]))


setup.py with:



REQUIRED_PACKAGES = ['shapely']





share|improve this answer
























    Your Answer






    StackExchange.ifUsing("editor", function () {
    StackExchange.using("externalEditor", function () {
    StackExchange.using("snippets", function () {
    StackExchange.snippets.init();
    });
    });
    }, "code-snippets");

    StackExchange.ready(function() {
    var channelOptions = {
    tags: "".split(" "),
    id: "1"
    };
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function() {
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled) {
    StackExchange.using("snippets", function() {
    createEditor();
    });
    }
    else {
    createEditor();
    }
    });

    function createEditor() {
    StackExchange.prepareEditor({
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader: {
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    },
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    });


    }
    });














    draft saved

    draft discarded


















    StackExchange.ready(
    function () {
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53337828%2fgoogle-dataflow-global-name-is-not-defined-apache-beam%23new-answer', 'question_page');
    }
    );

    Post as a guest















    Required, but never shown

























    2 Answers
    2






    active

    oldest

    votes








    2 Answers
    2






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    1














    This is because you need to tell dataflow to install package you want.



    Briefly documentation is here.



    Simply speak, for PyPi package like shapely, you can do the following to ensure all dependencies installed.




    1. pip freeze > requirements.txt

    2. Remove all unrelated package in requirements.txt

    3. Run your pipline with --requirements_file requirements.txt


    Or even more, if you want to do something like install linux package by apt-get or using your own python module. Take a look on this official example. You need to setup a setup.py for this and change your pipeline command with
    --setup_file setup.py.



    For PyPi module, use the REQUIRED_PACKAGES in example.



    REQUIRED_PACKAGES = [
    'numpy','shapely'
    ]


    If you are use pipeline options, then add setup.py as



    pipeline_options = {
    'project': PROJECT,
    'staging_location': 'gs://' + BUCKET + '/staging',
    'runner': 'DataflowRunner',
    'job_name': 'test',
    'temp_location': 'gs://' + BUCKET + '/temp',
    'save_main_session': True,
    'setup_file':'.setup.py'
    }
    options = PipelineOptions.from_dictionary(pipeline_options)
    p = beam.Pipeline(options=options)





    share|improve this answer


























    • Thanks! I tried with requirements.txt, but I had a problem(added to the main post). So according to This I think I have to uso the setup.py to use apt-get, isn't it? What change should I do in the example? Edit CUSTOM_COMMANDS = [ ['echo', 'Custom command worked!']]?

      – IoT user
      Nov 19 '18 at 10:18













    • That's strange, though I use the REQUIRED_PACKAGES (as edited), I can manage to install required pip package, have you add setup.py into pipeline option?

      – MatrixTai
      Nov 20 '18 at 3:30











    • Quite strange, because even if I add that PipelineOptions with setup_file is like what I wrote in setup.py has no effect. I attach more information to see what could be wrong in what i'm doing

      – IoT user
      Nov 20 '18 at 8:05











    • @IoTuser, I will make a test case for shapely.

      – MatrixTai
      Nov 21 '18 at 1:28











    • I will wait for your result and hope you can make it! Thanks!

      – IoT user
      Nov 21 '18 at 7:39
















    1














    This is because you need to tell dataflow to install package you want.



    Briefly documentation is here.



    Simply speak, for PyPi package like shapely, you can do the following to ensure all dependencies installed.




    1. pip freeze > requirements.txt

    2. Remove all unrelated package in requirements.txt

    3. Run your pipline with --requirements_file requirements.txt


    Or even more, if you want to do something like install linux package by apt-get or using your own python module. Take a look on this official example. You need to setup a setup.py for this and change your pipeline command with
    --setup_file setup.py.



    For PyPi module, use the REQUIRED_PACKAGES in example.



    REQUIRED_PACKAGES = [
    'numpy','shapely'
    ]


    If you are use pipeline options, then add setup.py as



    pipeline_options = {
    'project': PROJECT,
    'staging_location': 'gs://' + BUCKET + '/staging',
    'runner': 'DataflowRunner',
    'job_name': 'test',
    'temp_location': 'gs://' + BUCKET + '/temp',
    'save_main_session': True,
    'setup_file':'.setup.py'
    }
    options = PipelineOptions.from_dictionary(pipeline_options)
    p = beam.Pipeline(options=options)





    share|improve this answer


























    • Thanks! I tried with requirements.txt, but I had a problem(added to the main post). So according to This I think I have to uso the setup.py to use apt-get, isn't it? What change should I do in the example? Edit CUSTOM_COMMANDS = [ ['echo', 'Custom command worked!']]?

      – IoT user
      Nov 19 '18 at 10:18













    • That's strange, though I use the REQUIRED_PACKAGES (as edited), I can manage to install required pip package, have you add setup.py into pipeline option?

      – MatrixTai
      Nov 20 '18 at 3:30











    • Quite strange, because even if I add that PipelineOptions with setup_file is like what I wrote in setup.py has no effect. I attach more information to see what could be wrong in what i'm doing

      – IoT user
      Nov 20 '18 at 8:05











    • @IoTuser, I will make a test case for shapely.

      – MatrixTai
      Nov 21 '18 at 1:28











    • I will wait for your result and hope you can make it! Thanks!

      – IoT user
      Nov 21 '18 at 7:39














    1












    1








    1







    This is because you need to tell dataflow to install package you want.



    Briefly documentation is here.



    Simply speak, for PyPi package like shapely, you can do the following to ensure all dependencies installed.




    1. pip freeze > requirements.txt

    2. Remove all unrelated package in requirements.txt

    3. Run your pipline with --requirements_file requirements.txt


    Or even more, if you want to do something like install linux package by apt-get or using your own python module. Take a look on this official example. You need to setup a setup.py for this and change your pipeline command with
    --setup_file setup.py.



    For PyPi module, use the REQUIRED_PACKAGES in example.



    REQUIRED_PACKAGES = [
    'numpy','shapely'
    ]


    If you are use pipeline options, then add setup.py as



    pipeline_options = {
    'project': PROJECT,
    'staging_location': 'gs://' + BUCKET + '/staging',
    'runner': 'DataflowRunner',
    'job_name': 'test',
    'temp_location': 'gs://' + BUCKET + '/temp',
    'save_main_session': True,
    'setup_file':'.setup.py'
    }
    options = PipelineOptions.from_dictionary(pipeline_options)
    p = beam.Pipeline(options=options)





    share|improve this answer















    This is because you need to tell dataflow to install package you want.



    Briefly documentation is here.



    Simply speak, for PyPi package like shapely, you can do the following to ensure all dependencies installed.




    1. pip freeze > requirements.txt

    2. Remove all unrelated package in requirements.txt

    3. Run your pipline with --requirements_file requirements.txt


    Or even more, if you want to do something like install linux package by apt-get or using your own python module. Take a look on this official example. You need to setup a setup.py for this and change your pipeline command with
    --setup_file setup.py.



    For PyPi module, use the REQUIRED_PACKAGES in example.



    REQUIRED_PACKAGES = [
    'numpy','shapely'
    ]


    If you are use pipeline options, then add setup.py as



    pipeline_options = {
    'project': PROJECT,
    'staging_location': 'gs://' + BUCKET + '/staging',
    'runner': 'DataflowRunner',
    'job_name': 'test',
    'temp_location': 'gs://' + BUCKET + '/temp',
    'save_main_session': True,
    'setup_file':'.setup.py'
    }
    options = PipelineOptions.from_dictionary(pipeline_options)
    p = beam.Pipeline(options=options)






    share|improve this answer














    share|improve this answer



    share|improve this answer








    edited Nov 20 '18 at 3:32

























    answered Nov 19 '18 at 9:21









    MatrixTaiMatrixTai

    2,0341623




    2,0341623













    • Thanks! I tried with requirements.txt, but I had a problem(added to the main post). So according to This I think I have to uso the setup.py to use apt-get, isn't it? What change should I do in the example? Edit CUSTOM_COMMANDS = [ ['echo', 'Custom command worked!']]?

      – IoT user
      Nov 19 '18 at 10:18













    • That's strange, though I use the REQUIRED_PACKAGES (as edited), I can manage to install required pip package, have you add setup.py into pipeline option?

      – MatrixTai
      Nov 20 '18 at 3:30











    • Quite strange, because even if I add that PipelineOptions with setup_file is like what I wrote in setup.py has no effect. I attach more information to see what could be wrong in what i'm doing

      – IoT user
      Nov 20 '18 at 8:05











    • @IoTuser, I will make a test case for shapely.

      – MatrixTai
      Nov 21 '18 at 1:28











    • I will wait for your result and hope you can make it! Thanks!

      – IoT user
      Nov 21 '18 at 7:39



















    • Thanks! I tried with requirements.txt, but I had a problem(added to the main post). So according to This I think I have to uso the setup.py to use apt-get, isn't it? What change should I do in the example? Edit CUSTOM_COMMANDS = [ ['echo', 'Custom command worked!']]?

      – IoT user
      Nov 19 '18 at 10:18













    • That's strange, though I use the REQUIRED_PACKAGES (as edited), I can manage to install required pip package, have you add setup.py into pipeline option?

      – MatrixTai
      Nov 20 '18 at 3:30











    • Quite strange, because even if I add that PipelineOptions with setup_file is like what I wrote in setup.py has no effect. I attach more information to see what could be wrong in what i'm doing

      – IoT user
      Nov 20 '18 at 8:05











    • @IoTuser, I will make a test case for shapely.

      – MatrixTai
      Nov 21 '18 at 1:28











    • I will wait for your result and hope you can make it! Thanks!

      – IoT user
      Nov 21 '18 at 7:39

















    Thanks! I tried with requirements.txt, but I had a problem(added to the main post). So according to This I think I have to uso the setup.py to use apt-get, isn't it? What change should I do in the example? Edit CUSTOM_COMMANDS = [ ['echo', 'Custom command worked!']]?

    – IoT user
    Nov 19 '18 at 10:18







    Thanks! I tried with requirements.txt, but I had a problem(added to the main post). So according to This I think I have to uso the setup.py to use apt-get, isn't it? What change should I do in the example? Edit CUSTOM_COMMANDS = [ ['echo', 'Custom command worked!']]?

    – IoT user
    Nov 19 '18 at 10:18















    That's strange, though I use the REQUIRED_PACKAGES (as edited), I can manage to install required pip package, have you add setup.py into pipeline option?

    – MatrixTai
    Nov 20 '18 at 3:30





    That's strange, though I use the REQUIRED_PACKAGES (as edited), I can manage to install required pip package, have you add setup.py into pipeline option?

    – MatrixTai
    Nov 20 '18 at 3:30













    Quite strange, because even if I add that PipelineOptions with setup_file is like what I wrote in setup.py has no effect. I attach more information to see what could be wrong in what i'm doing

    – IoT user
    Nov 20 '18 at 8:05





    Quite strange, because even if I add that PipelineOptions with setup_file is like what I wrote in setup.py has no effect. I attach more information to see what could be wrong in what i'm doing

    – IoT user
    Nov 20 '18 at 8:05













    @IoTuser, I will make a test case for shapely.

    – MatrixTai
    Nov 21 '18 at 1:28





    @IoTuser, I will make a test case for shapely.

    – MatrixTai
    Nov 21 '18 at 1:28













    I will wait for your result and hope you can make it! Thanks!

    – IoT user
    Nov 21 '18 at 7:39





    I will wait for your result and hope you can make it! Thanks!

    – IoT user
    Nov 21 '18 at 7:39













    0














    Import inside the function + setup.py:



    class GeoDataIngestion:
    def parse_method(self, string_input):
    from shapely.geometry import Point
    place = Point(float(values[2]), float(values[3]))


    setup.py with:



    REQUIRED_PACKAGES = ['shapely']





    share|improve this answer




























      0














      Import inside the function + setup.py:



      class GeoDataIngestion:
      def parse_method(self, string_input):
      from shapely.geometry import Point
      place = Point(float(values[2]), float(values[3]))


      setup.py with:



      REQUIRED_PACKAGES = ['shapely']





      share|improve this answer


























        0












        0








        0







        Import inside the function + setup.py:



        class GeoDataIngestion:
        def parse_method(self, string_input):
        from shapely.geometry import Point
        place = Point(float(values[2]), float(values[3]))


        setup.py with:



        REQUIRED_PACKAGES = ['shapely']





        share|improve this answer













        Import inside the function + setup.py:



        class GeoDataIngestion:
        def parse_method(self, string_input):
        from shapely.geometry import Point
        place = Point(float(values[2]), float(values[3]))


        setup.py with:



        REQUIRED_PACKAGES = ['shapely']






        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered Nov 23 '18 at 9:12









        IoT userIoT user

        15912




        15912






























            draft saved

            draft discarded




















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53337828%2fgoogle-dataflow-global-name-is-not-defined-apache-beam%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Xamarin.iOS Cant Deploy on Iphone

            Glorious Revolution

            Dulmage-Mendelsohn matrix decomposition in Python