luigi Workflows with multiple async calls

I'm attempting to implement a Luigi workflow that interfaces with OneDrive using msgraph in three distinct steps:

# simplified for the sake of clarity/brevity

class RegisterTask(luigi.Task):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # custom class that manages targets across tasks
        self.register = TargetRegister(self)

class OneDriveItemsExtractTask(RegisterTask)
    def output(self):
        return self.register.generate_output(PickleTarget)

    def run(self):
        data = OneDriveData(
            "my_connection", 
            "my_parent_directory", 
            ["item1.csv", "item2.csv"],
            "csv"
            )
        items = asyncio.run(get_onedrive_items(data))
        item_ids = [item.id for item in items]
        self.output.write(item_ids)


class OneDriveContentExtractTask(RegisterTask)
    def requires(self):
        return OneDriveItemsExtractTask(**self.param_kwargs)

    def output(self):
        return self.register.generate_output(ParquetTarget)

    def run(self):
        item_ids = self.input().read()
        content = asyncio.run(get_onedrive_content("my_connection", item_ids))


class DeleteOneDriveItemsTask(RegisterTask)
    def requires(self):
        return {
        "item_ids": OneDriveItemsExtractTask(**self.param_kwargs),
        "extract_compete": OneDriveContentExtractTask(**self.param_kwargs),
        }

    def output(self):
        return self.register.generate_output(ParquetTarget)

    def run(self):
        item_ids = self.input()["item_ids"].read()
        asyncio.run(delete_onedrive_items("my_connection", item_ids))

The tasks and workflow pass unit tests when isolated from the msgraph dependency with testing fixtures, both when submitted to the Luigi scheduler and when calling the run() method after instantiation.

Running the tasks outside of the Luigi scheduler without testing fixtures also works as expected.

However, running the workflow in integration testing against a real graph endpoint results in RuntimeError: Event loop is closed.

Here's the workaround I came up with, which is suboptimal:

class OneDriveContentExtractTask(RegisterTask)
    def output(self):
        return self.register.generate_output(ParquetTarget)

    async def _async_operations(self, data:OneDriveData):
        items = await get_onedrive_items(data)
        content = await get_onedrive_content(data, items)

        if self.delete_extracted_items and not content.empty():
            await delete_onedrive_items(data.connection, items)

        return content

    def run(self):
        data = OneDriveData(
            "my_connection", 
            "my_parent_directory", 
            ["item1.csv", "item2.csv"],
            "csv"
            )
        df = asyncio.run(self._async_operations(data))

        self.output().write(df)

This works. It seems what breaks luigi is having multiple async loops in a single workflow. I'd appreciate any insight into how to properly manage multiple async calls in this context. I'm using luigi 3.4.0.

Upvotes: 0

Views: 82

Answers (0)

Related Questions