Dave Liu
Dave Liu

Reputation: 1142

Flyte 0.16.2: Error loading Blob - How to get Types.Blob.fetch() to work in task decorated function?

I have a Flyte task function like this:

@task
def do_stuff(framework_obj):
    framework_obj.get_outputs()  # This calls Types.Blob.fetch(some_uri)

Trying to load a blob URI using flytekit.sdk.types.Types.Blob.fetch, but getting this error:

ERROR:flytekit: Exception when executing No temporary file system is present.  Either call this method from within the context of a task or surround with a 'with LocalTestFileSystem():' block.  Or specify a path when calling this function.  Note: Cleanup is not automatic when a path is specified.

I can confirm I can load blobs using with LocalTestFileSystem(), in tests, but when actually trying to run a workflow, I'm not sure why I'm getting this error, as the function that calls blob-processing is decorated with @task so it's definitely a Flyte Task. I also confirmed that the task node exists on the Flyte web console.

What path is the error referencing and how do I call this function appropriately?

Using Flyte Version 0.16.2

Upvotes: 0

Views: 269

Answers (1)

kaylindris
kaylindris

Reputation: 88

Could you please give a bit more information about the code? This is flytekit version 0.15.x? I'm a bit confused since that version shouldn't have the @task decorator. It should only have @python_task which is an older API. If you want to use the new python native typing API you should install flytekit==0.17.0 instead.

Also, could you point to the documentation you're looking at? We've updated the docs a fair amount recently, maybe there's some confusion around that. These are the examples worth looking at. There's also two new Python classes, FlyteFile and FlyteDirectory that have replaced the Blob class in flytekit (though that remains what the IDL type is called).

(would've left this as a comment but I don't have the reputation to yet.)

Some code to help with fetching outputs and reading from a file output

@task
def task_file_reader():
    client = SynchronousFlyteClient("flyteadmin.flyte.svc.cluster.local:81", insecure=True)
    exec_id = WorkflowExecutionIdentifier(
    domain="development",
    project="flytesnacks",
    name="iaok0qy6k1",
    )
    data = client.get_execution_data(exec_id)
    lit = data.full_outputs.literals["o0"]

    ctx = FlyteContext.current_context()
    ff = TypeEngine.to_python_value(ctx, lv=lit, 
    expected_python_type=FlyteFile)
    with open(ff, 'rb') as fh:
        print(fh.readlines())

Upvotes: 1

Related Questions