Build a Pipeline

A tutorial on building pipelines to orchestrate your ML workflow

A Kubeflow pipeline is a portable and scalable definition of a machine learning (ML) workflow. Each step in your ML workflow, such as preparing data or training a model, is an instance of a pipeline component. This document provides an overview of pipeline concepts and best practices, and instructions describing how to build an ML pipeline.

Before you begin

  1. Run the following command to install the Kubeflow Pipelines SDK. If you run this command in a Jupyter notebook, restart the kernel after installing the SDK.
$ pip install kfp --upgrade
  1. Import the kfp and kfp.components packages.
import kfp
import kfp.components as comp

Understanding pipelines

A Kubeflow pipeline is a portable and scalable definition of an ML workflow, based on containers. A pipeline is composed of a set of input parameters and a list of the steps in this workflow. Each step in a pipeline is an instance of a component, which is represented as an instance of ContainerOp.

You can use pipelines to:

  • Orchestrate repeatable ML workflows.
  • Accelerate experimentation by running a workflow with different sets of hyperparameters.

Understanding pipeline components

A pipeline component is a containerized application that performs one step in a pipeline’s workflow. Pipeline components are defined in component specifications, which define the following:

  • The component’s interface, its inputs and outputs.
  • The component’s implementation, the container image and the command to execute.
  • The component’s metadata, such as the name and description of the component.

You can build components by defining a component specification for a containerized application, or you can use the Kubeflow Pipelines SDK to generate a component specification for a Python function. You can also reuse prebuilt components in your pipeline.

Understanding the pipeline graph

Each step in your pipeline’s workflow is an instance of a component. When you define your pipeline, you specify the source of each step’s inputs. Step inputs can be set from the pipeline’s input arguments, constants, or step inputs can depend on the outputs of other steps in this pipeline. Kubeflow Pipelines uses these dependencies to define your pipeline’s workflow as a graph.

For example, consider a pipeline with the following steps: ingest data, generate statistics, preprocess data, and train a model. The following describes the data dependencies between each step.

  • Ingest data: This step loads data from an external source which is specified using a pipeline argument, and it outputs a dataset. Since this step does not depend on the output of any other steps, this step can run first.
  • Generate statistics: This step uses the ingested dataset to generate and output a set of statistics. Since this step depends on the dataset produced by the ingest data step, it must run after the ingest data step.
  • Preprocess data: This step preprocesses the ingested dataset and transforms the data into a preprocessed dataset. Since this step depends on the dataset produced by the ingest data step, it must run after the ingest data step.
  • Train a model: This step trains a model using the preprocessed dataset, the generated statistics, and pipeline parameters, such as the learning rate. Since this step depends on the preprocessed data and the generated statistics, it must run after both the preprocess data and generate statistics steps are complete.

Since the generate statistics and preprocess data steps both depend on the ingested data, the generate statistics and preprocess data steps can run in parallel. All other steps are executed once their data dependencies are available.

Designing your pipeline

When designing your pipeline, think about how to split your ML workflow into pipeline components. The process of splitting an ML workflow into pipeline components is similar to the process of splitting a monolithic script into testable functions. The following rules can help you define the components that you need to build your pipeline.

  • Components should have a single responsibility. Having a single responsibility makes it easier to test and reuse a component. For example, if you have a component that loads data you can reuse that for similar tasks that load data. If you have a component that loads and transforms a dataset, the component can be less useful since you can use it only when you need to load and transform that dataset.

  • Reuse components when possible. Kubeflow Pipelines provides components for common pipeline tasks and for access to cloud services.

  • Consider what you need to know to debug your pipeline and research the lineage of the models that your pipeline produces. Kubeflow Pipelines stores the inputs and outputs of each pipeline step. By interrogating the artifacts produced by a pipeline run, you can better understand the variations in model quality between runs or track down bugs in your workflow.

In general, you should design your components with composability in mind.

Pipelines are composed of component instances, also called steps. Steps can define their inputs as depending on the output of another step. The dependencies between steps define the pipeline workflow graph.

Building pipeline components

Kubeflow pipeline components are containerized applications that perform a step in your ML workflow. Here are the ways that you can define pipeline components:

  • If you have a containerized application that you want to use as a pipeline component, create a component specification to define this container image as a pipeline component.

    This option provides the flexibility to include code written in any language in your pipeline, so long as you can package the application as a container image. Learn more about building pipeline components.

  • If your component code can be expressed as a Python function, evaluate if your component can be built as a Python function-based component. The Kubeflow Pipelines SDK makes it easier to build lightweight Python function-based components by saving you the effort of creating a component specification.

Whenever possible, reuse prebuilt components to save yourself the effort of building custom components.

The example in this guide demonstrates how to build a pipeline that uses a Python function-based component and reuses a prebuilt component.

Understanding how data is passed between components

When Kubeflow Pipelines runs a component, a container image is started in a Kubernetes Pod and your component’s inputs are passed in as command-line arguments. When your component has finished, the component’s outputs are returned as files.

In your component’s specification, you define the components inputs and outputs and how the inputs and output paths are passed to your program as command-line arguments. You can pass small inputs, such as short strings or numbers, to your component by value. Large inputs, such as datasets, must be passed to your component as file paths. Outputs are written to the paths that Kubeflow Pipelines provides.

Python function-based components make it easier to build pipeline components by building the component specification for you. Python function-based components also handle the complexity of passing inputs into your component and passing your function’s outputs back to your pipeline.

Learn more about how Python function-based components handle inputs and outputs.

Getting started building a pipeline

The following sections demonstrate how to get started building a Kubeflow pipeline by walking through the process of converting a Python script into a pipeline.

Design your pipeline

The following steps walk through some of the design decisions you may face when designing a pipeline.

  1. Evaluate the process. In the following example, a Python function downloads a zipped tar file (.tar.gz) that contains several CSV files, from a public website. The function extracts the CSV files and then merges them into a single file.
import glob
import pandas as pd
import tarfile
import urllib.request
    
def download_and_merge_csv(url: str, output_csv: str):
  with urllib.request.urlopen(url) as res:
    tarfile.open(fileobj=res, mode="r|gz").extractall('data')
  df = pd.concat(
      [pd.read_csv(csv_file, header=None) 
       for csv_file in glob.glob('data/*.csv')])
  df.to_csv(output_csv, index=False, header=False)
  1. Run the following Python command to test the function.
download_and_merge_csv(
    url='https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz', 
    output_csv='merged_data.csv')
  1. Run the following to print the first few rows of the merged CSV file.
$ head merged_data.csv
  1. Design your pipeline. For example, consider the following pipeline designs.

    • Implement the pipeline using a single step. In this case, the pipeline contains one component that works similarly to the example function. This is a straightforward function, and implementing a single-step pipeline is a reasonable approach in this case.

      The down side of this approach is that the zipped tar file would not be an artifact of your pipeline runs. Not having this artifact available could make it harder to debug this component in production.

    • Implement this as a two-step pipeline. The first step downloads a file from a website. The second step extracts the CSV files from a zipped tar file and merges them into a single file.

      This approach has a few benefits:

      • You can reuse the Web Download component to implement the first step.
      • Each step has a single responsibility, which makes the components easier to reuse.
      • The zipped tar file is an artifact of the first pipeline step. This means that you can examine this artifact when debugging pipelines that use this component.

    This example implements a two-step pipeline.

Build your pipeline components

  1. Build your pipeline components. This example modifies the initial script to extract the contents of a zipped tar file, merge the CSV files that were contained in the zipped tar file, and return the merged CSV file.

    This example builds a Python function-based component. You can also package your component’s code as a Docker container image and define the component using a ComponentSpec.

    In this case, the following modifications were required to the original function.

    • The file download logic was removed. The path to the zipped tar file is passed as an argument to this function.
    • The import statements were moved inside of the function. Python function-based components require standalone Python functions. This means that any required import statements must be defined within the function, and any helper functions must be defined within the function. Learn more about building Python function-based components.
    • The function’s arguments are decorated with the kfp.components.InputPath and the kfp.components.OutputPath annotations. These annotations let Kubeflow Pipelines know to provide the path to the zipped tar file and to create a path where your function stores the merged CSV file.

    The following example shows the updated merge_csv function.

def merge_csv(file_path: comp.InputPath('Tarball'),
              output_csv: comp.OutputPath('CSV')):
  import glob
  import pandas as pd
  import tarfile

  tarfile.open(name=file_path, mode="r|gz").extractall('data')
  df = pd.concat(
      [pd.read_csv(csv_file, header=None) 
       for csv_file in glob.glob('data/*.csv')])
  df.to_csv(output_csv, index=False, header=False)
  1. Use kfp.components.create_component_from_func to return a factory function that you can use to create pipeline steps. This example also specifies the base container image to run this function in, the path to save the component specification to, and a list of PyPI packages that need to be installed in the container at runtime.
create_step_merge_csv = kfp.components.create_component_from_func(
    func=merge_csv,
    output_component_file='component.yaml', # This is optional. It saves the component spec for future use.
    base_image='python:3.7',
    packages_to_install=['pandas==1.1.4'])

Build your pipeline

  1. Use kfp.components.load_component_from_url to load the component specification YAML for any components that you are reusing in this pipeline.
web_downloader_op = kfp.components.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/contrib/web/Download/component.yaml')
  1. Define your pipeline as a Python function.

    Your pipeline function’s arguments define your pipeline’s parameters. Use pipeline parameters to experiment with different hyperparameters, such as the learning rate used to train a model, or pass run-level inputs, such as the path to an input file, into a pipeline run.

    Use the factory functions created by kfp.components.create_component_from_func and kfp.components.load_component_from_url to create your pipeline’s tasks. The inputs to the component factory functions can be pipeline parameters, the outputs of other tasks, or a constant value. In this case, the web_downloader_task task uses the url pipeline parameter, and the merge_csv_task uses the data output of the web_downloader_task.

# Define a pipeline and create a task from a component:
def my_pipeline(url):
  web_downloader_task = web_downloader_op(url=url)
  merge_csv_task = create_step_merge_csv(file=web_downloader_task.outputs['data'])
  # The outputs of the merge_csv_task can be referenced using the
  # merge_csv_task.outputs dictionary: merge_csv_task.outputs['output_csv']

Compile and run your pipeline

After defining the pipeline in Python as described in the preceding section, use one of the following options to compile the pipeline and submit it to the Kubeflow Pipelines service.

Option 1: Compile and then upload in UI

  1. Run the following to compile your pipeline and save it as pipeline.yaml.
kfp.compiler.Compiler().compile(
    pipeline_func=my_pipeline,
    package_path='pipeline.yaml')
  1. Upload and run your pipeline.yaml using the Kubeflow Pipelines user interface. See the guide to getting started with the UI.

Option 2: run the pipeline using Kubeflow Pipelines SDK client

  1. Create an instance of the kfp.Client class following steps in connecting to Kubeflow Pipelines using the SDK client.
client = kfp.Client() # change arguments accordingly
  1. Run the pipeline using the kfp.Client instance:
client.create_run_from_pipeline_func(
    my_pipeline,
    arguments={
        'url': 'https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz'
    })

Next steps

Feedback

Was this page helpful?


Last modified June 11, 2024: Fixed broken links (bb402314)