diegoruizbarbero
diegoruizbarbero

Reputation: 121

Can't run basic PyFlink example

I have this toy pipeline

from pyflink.datastream import StreamExecutionEnvironment


def pipeline():
    # Create environment
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    ds = env.read_text_file('file:///home/user/myfile.json')
    # ds.map(lambda i: i)
    ds.print()

    # Execute job
    env.execute('DynamicStockpilePipeline')


if __name__ == '__main__':
    pipeline()

That runs just fine, but everytime I try to uncomment the mapping stage, be it a dummy inline lambda or a MapFunction, it explodes saying:

Caused by: java.io.IOException: Failed to execute the command: python
-c import pyflink;import os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)), 'bin')) output: Traceback (most recent call last):   File "<string>",line 1, in <module> ModuleNotFoundError: No module named 'pyflink'

I'm using a pyenv 3.8 interpreter, would anyone know how come the basic datasource and outputting runs, but the maps says the pyflink module is missing?

Addendum: this only happens on PyCharm, it does not happen when I run the script on the console, so I'm wary it is something between PyCharm and pyenv

Upvotes: 0

Views: 1255

Answers (3)

Lyon0804
Lyon0804

Reputation: 11

use pip install apache-flink instead

Upvotes: 0

diegoruizbarbero
diegoruizbarbero

Reputation: 121

I got rid of this error by assigning a the pyenv 3.8 as virtual environment instead of assigning the pyenv 3.8 from system interpreters option in PyCharm. I guess something about the pythonpath gets broken with the previous option

Upvotes: 1

ChangLi
ChangLi

Reputation: 813

Maybe you can post the code when the error is reported.

According to the context, the error reported here may be a usage error,the print function needs to follow the map function

from pyflink.datastream import StreamExecutionEnvironment


def pipeline():
    # Create environment
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    ds = env.read_text_file('file:///home/user/myfile.json')
    ds.map(lambda i: i).print()

    # Execute job
    env.execute('DynamicStockpilePipeline')


if __name__ == '__main__':
    pipeline()

Upvotes: 0

Related Questions