got2nosth
got2nosth

Reputation: 618

Kedro: ValueError: Pipeline does not contain nodes named ['preprocess_companies_node']

Similar to the question described earlier, I followed the spaceflights tutorial, at create pipeline step, I got the following error when running kedro run --node=preproces_companies_node

ValueError: Pipeline does not contain nodes named ['preprocess_companies_node'].

The relevant files are specified as instructed in the tutorial

from kedro.pipeline import Pipeline, node

from .nodes import preprocess_companies, preprocess_shuttles

def create_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                func=preprocess_companies,
                inputs="companies",
                outputs="preprocessed_companies",
                name="preprocess_companies_node",
            ),
            node(
                func=preprocess_shuttles,
                inputs="shuttles",
                outputs="preprocessed_shuttles",
                name="preprocess_shuttles_node",
            ),
        ]
    )
def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame:
    """Preprocesses the data for companies.

    Args:
        companies: Raw data.
    Returns:
        Preprocessed data, with `company_rating` converted to a float and
        `iata_approved` converted to boolean.
    """
    companies["iata_approved"] = _is_true(companies["iata_approved"])
    companies["company_rating"] = _parse_percentage(companies["company_rating"])
    return companies


def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
    """Preprocesses the data for shuttles.

    Args:
        shuttles: Raw data.
    Returns:
        Preprocessed data, with `price` converted to a float and `d_check_complete`,
        `moon_clearance_complete` converted to boolean.
    """
    shuttles["d_check_complete"] = _is_true(shuttles["d_check_complete"])
    shuttles["moon_clearance_complete"] = _is_true(shuttles["moon_clearance_complete"])
    shuttles["price"] = _parse_money(shuttles["price"])
    return shuttles
from typing import Dict

from kedro.pipeline import Pipeline

from kedro_tutorial.pipelines import data_processing as dp


def register_pipelines() -> Dict[str, Pipeline]:
    """Register the project's pipeline.

    Returns:
    A mapping from a pipeline name to a ``Pipeline`` object.

    """
    data_processing_pipeline = dp.create_pipeline()

    return {
        "__default__": data_processing_pipeline,
        "dp": data_processing_pipeline,
    }

I made sure I have registered a __default__ pipeline and my node name is exactly as the command runs preprocess_companies_node

My Kedro version is 0.16.6 and python version is 3.7.10

Any idea what I did wrong here?

Thank you.

Upvotes: 1

Views: 1835

Answers (1)

Waylon Walker
Waylon Walker

Reputation: 563

The issue is that you are following the tutorial for version 0.17.3+ , while using kedro==0.16.6. This is an easy mistake to make, don't fret. The pipeline_registry.py module was introduced in 0.17.3. Your options are to upgrade to the latest kedro version or to put your register your pipelines in a module called hooks.py rather than pipeline_registry.py.

# src/<project_name>/hooks.py
"""Project hooks."""
from typing import Any, Dict, Iterable, Optional

from kedro.config import ConfigLoader
from kedro.framework.hooks import hook_impl
from kedro.io import DataCatalog
from kedro.pipeline import Pipeline
from kedro.versioning import Journal

from sixteen.pipelines import data_engineering as de
from sixteen.pipelines import data_science as ds


class ProjectHooks:
    @hook_impl
    def register_pipelines(self) -> Dict[str, Pipeline]:
        """Register the project's pipeline.

        Returns:
            A mapping from a pipeline name to a ``Pipeline`` object.

        """
        data_engineering_pipeline = de.create_pipeline()
        data_science_pipeline = ds.create_pipeline()

        return {
            "de": data_engineering_pipeline,
            "ds": data_science_pipeline,
            "__default__": data_engineering_pipeline + data_science_pipeline,
        }

    @hook_impl
    def register_config_loader(self, conf_paths: Iterable[str]) -> ConfigLoader:
        return ConfigLoader(conf_paths)

    @hook_impl
    def register_catalog(
        self,
        catalog: Optional[Dict[str, Dict[str, Any]]],
        credentials: Dict[str, Dict[str, Any]],
        load_versions: Dict[str, str],
        save_version: str,
        journal: Journal,
    ) -> DataCatalog:
        return DataCatalog.from_config(
            catalog, credentials, load_versions, save_version, journal
        )


project_hooks = ProjectHooks()

You can generate a full example for this version for yourself by running a kedro new command against this version.

# these bash two commands are safe to run outside of a virtual environment
# pipx creates the virtual environment for you
pip install pipx
pipx run --spec kedro==0.16.6 kedro new

The rest of your code looks like valid 0.16.6 kedro to me. Once you get your pipeline_registry moved into hooks you can confirm that it works with the kedro pipeline list command to ensure kedro is picking up your pipeline code.

Upvotes: 3

Related Questions