Building Python function-based components

Building your own lightweight pipelines components using Python

A Kubeflow Pipelines component is a self-contained set of code that performs one step in your ML workflow. A pipeline component is composed of:

  • The component code, which implements the logic needed to perform a step in your ML workflow.

  • A component specification, which defines the following:

    • The component’s metadata, its name and description.
    • The component’s interface, the component’s inputs and outputs.
    • The component’s implementation, the Docker container image to run, how to pass inputs to your component code, and how to get the component’s outputs.

Python function-based components make it easier to iterate quickly by letting you build your component code as a Python function and generating the component specification for you. This document describes how to build Python function-based components and use them in your pipeline.

Before you begin

  1. Run the following command to install the Kubeflow Pipelines SDK. If you run this command in a Jupyter notebook, restart the kernel after installing the SDK.
$ pip install kfp==1.8
  1. Import the kfp package.
import kfp
from kfp.components import create_component_from_func
  1. Create an instance of the kfp.Client class following steps in connecting to Kubeflow Pipelines using the SDK client.
client = kfp.Client() # change arguments accordingly

For more information about the Kubeflow Pipelines SDK, see the SDK reference guide.

Getting started with Python function-based components

This section demonstrates how to get started building Python function-based components by walking through the process of creating a simple component.

  1. Define your component’s code as a standalone Python function. In this example, the function adds two floats and returns the sum of the two arguments.
def add(a: float, b: float) -> float:
  '''Calculates sum of two arguments'''
  return a + b
  1. Use kfp.components.create_component_from_func to generate the component specification YAML and return a factory function that you can use to create kfp.dsl.ContainerOp class instances for your pipeline. The component specification YAML is a reusable and shareable definition of your component.
add_op = create_component_from_func(
    add, output_component_file='add_component.yaml')
  1. Create and run your pipeline. Learn more about creating and running pipelines.
import kfp.dsl as dsl
@dsl.pipeline(
  name='Addition pipeline',
  description='An example pipeline that performs addition calculations.'
)
def add_pipeline(
  a='1',
  b='7',
):
  # Passes a pipeline parameter and a constant value to the `add_op` factory
  # function.
  first_add_task = add_op(a, 4)
  # Passes an output reference from `first_add_task` and a pipeline parameter
  # to the `add_op` factory function. For operations with a single return
  # value, the output reference can be accessed as `task.output` or
  # `task.outputs['output_name']`.
  second_add_task = add_op(first_add_task.output, b)

# Specify argument values for your pipeline run.
arguments = {'a': '7', 'b': '8'}

# Create a pipeline run, using the client you initialized in a prior step.
client.create_run_from_pipeline_func(add_pipeline, arguments=arguments)

Building Python function-based components

Use the following instructions to build a Python function-based component:

  1. Define a standalone Python function. This function must meet the following requirements:

  2. Kubeflow Pipelines uses your function’s inputs and outputs to define your component’s interface. Learn more about passing data between components. Your function’s inputs and outputs must meet the following requirements:

  3. (Optional.) If your function has complex dependencies, choose or build a container image for your Python function to run in. Learn more about selecting or building your component’s container image.

  4. Call kfp.components.create_component_from_func(func) to convert your function into a pipeline component.

    • func: The Python function to convert.
    • base_image: (Optional.) Specify the Docker container image to run this function in. Learn more about selecting or building a container image.
    • output_component_file: (Optional.) Writes your component definition to a file. You can use this file to share the component with colleagues or reuse it in different pipelines.
    • packages_to_install: (Optional.) A list of versioned Python packages to install before running your function.

Using and installing Python packages

When Kubeflow Pipelines runs your pipeline, each component runs within a Docker container image on a Kubernetes Pod. To load the packages that your Python function depends on, one of the following must be true:

  • The package must be installed on the container image.
  • The package must be defined using the packages_to_install parameter of the kfp.components.create_component_from_func(func) function.
  • Your function must install the package. For example, your function can use the subprocess module to run a command like pip install that installs a package.

Selecting or building a container image

Currently, if you do not specify a container image, your Python-function based component uses the python:3.7 container image. If your function has complex dependencies, you may benefit from using a container image that has your dependencies preinstalled, or building a custom container image. Preinstalling your dependencies reduces the amount of time that your component runs in, since your component does not need to download and install packages each time it runs.

Many frameworks, such as TensorFlow and PyTorch, and cloud service providers offer prebuilt container images that have common dependencies installed.

If a prebuilt container is not available, you can build a custom container image with your Python function’s dependencies. For more information about building a custom container, read the Dockerfile reference guide in the Docker documentation.

If you build or select a container image, instead of using the default container image, the container image must use Python 3.5 or later.

Understanding how data is passed between components

When Kubeflow Pipelines runs your component, a container image is started in a Kubernetes Pod and your component’s inputs are passed in as command-line arguments. When your component has finished, the component’s outputs are returned as files.

Python function-based components make it easier to build pipeline components by building the component specification for you. Python function-based components also handle the complexity of passing inputs into your component and passing your function’s outputs back to your pipeline.

The following sections describe how to pass parameters by value and by file.

  • Parameters that are passed by value include numbers, booleans, and short strings. Kubeflow Pipelines passes parameters to your component by value, by passing the values as command-line arguments.
  • Parameters that are passed by file include CSV, images, and complex types. These files are stored in a location that is accessible to your component running on Kubernetes, such as a persistent volume claim or a cloud storage service. Kubeflow Pipelines passes parameters to your component by file, by passing their paths as a command-line argument.

Input and output parameter names

When you use the Kubeflow Pipelines SDK to convert your Python function to a pipeline component, the Kubeflow Pipelines SDK uses the function’s interface to define the interface of your component in the following ways:

  • Some arguments define input parameters.
  • Some arguments define output parameters.
  • The function’s return value is used as an output parameter. If the return value is a collections.namedtuple, the named tuple is used to return several small values.

Since you can pass parameters between components as a value or as a path, the Kubeflow Pipelines SDK removes common parameter suffixes that leak the component’s expected implementation. For example, a Python function-based component that ingests data and outputs CSV data may have an output argument that is defined as csv_path: comp.OutputPath(str). In this case, the output is the CSV data, not the path. So, the Kubeflow Pipelines SDK simplifies the output name to csv.

The Kubeflow Pipelines SDK uses the following rules to define the input and output parameter names in your component’s interface:

  • If the argument name ends with _path and the argument is annotated as an kfp.components.InputPath or kfp.components.OutputPath, the parameter name is the argument name with the trailing _path removed.
  • If the argument name ends with _file, the parameter name is the argument name with the trailing _file removed.
  • If you return a single small value from your component using the return statement, the output parameter is named output.
  • If you return several small values from your component by returning a collections.namedtuple, the Kubeflow Pipelines SDK uses the tuple’s field names as the output parameter names.

Otherwise, the Kubeflow Pipelines SDK uses the argument name as the parameter name.

Passing parameters by value

Python function-based components make it easier to pass parameters between components by value (such as numbers, booleans, and short strings), by letting you define your component’s interface by annotating your Python function. The supported types are int, float, bool, and str. You can also pass list or dict instances by value, if they contain small values, such as int, float, bool, or str values. If you do not annotate your function, these input parameters are passed as strings.

If your component returns multiple outputs by value, annotate your function with the typing.NamedTuple type hint and use the collections.namedtuple function to return your function’s outputs as a new subclass of tuple.

You can also return metadata and metrics from your function.

The following example demonstrates how to return multiple outputs by value, including component metadata and metrics.

from typing import NamedTuple
def multiple_return_values_example(a: float, b: float) -> NamedTuple(
  'ExampleOutputs',
  [
    ('sum', float),
    ('product', float),
    ('mlpipeline_ui_metadata', 'UI_metadata'),
    ('mlpipeline_metrics', 'Metrics')
  ]):
  """Example function that demonstrates how to return multiple values."""  
  sum_value = a + b
  product_value = a * b

  # Export a sample tensorboard
  metadata = {
    'outputs' : [{
      'type': 'tensorboard',
      'source': 'gs://ml-pipeline-dataset/tensorboard-train',
    }]
  }

  # Export two metrics
  metrics = {
    'metrics': [
      {
        'name': 'sum',
        'numberValue':  float(sum_value),
      },{
        'name': 'product',
        'numberValue':  float(product_value),
      }
    ]  
  }

  from collections import namedtuple
  example_output = namedtuple(
      'ExampleOutputs',
      ['sum', 'product', 'mlpipeline_ui_metadata', 'mlpipeline_metrics'])
  return example_output(sum_value, product_value, metadata, metrics)

Passing parameters by file

Python function-based components make it easier to pass files to your component, or to return files from your component, by letting you annotate your Python function’s parameters to specify which parameters refer to a file. Your Python function’s parameters can refer to either input or output files. If your parameter is an output file, Kubeflow Pipelines passes your function a path or stream that you can use to store your output file.

The following example accepts a file as an input and returns two files as outputs.

def split_text_lines(
    source_path: comp.InputPath(str),
    odd_lines_path: comp.OutputPath(str),
    even_lines_path: comp.OutputPath(str)):
    """Splits a text file into two files, with even lines going to one file
    and odd lines to the other."""

    with open(source_path, 'r') as reader:
        with open(odd_lines_path, 'w') as odd_writer:
            with open(even_lines_path, 'w') as even_writer:
                while True:
                    line = reader.readline()
                    if line == "":
                        break
                    odd_writer.write(line)
                    line = reader.readline()
                    if line == "":
                        break
                    even_writer.write(line)

In this example, the inputs and outputs are defined as parameters of the split_text_lines function. This lets Kubeflow Pipelines pass the path to the source data file and the paths to the output data files into the function.

To accept a file as an input parameter, use one of the following type annotations:

To return a file as an output, use one of the following type annotations:

Example Python function-based component

This section demonstrates how to build a Python function-based component that uses imports, helper functions, and produces multiple outputs.

  1. Define your function. This example function uses the numpy package to calculate the quotient and remainder for a given dividend and divisor in a helper function. In addition to the quotient and remainder, the function also returns metadata for visualization and two metrics.
from typing import NamedTuple

def my_divmod(
  dividend: float,
  divisor: float) -> NamedTuple(
    'MyDivmodOutput',
    [
      ('quotient', float),
      ('remainder', float),
      ('mlpipeline_ui_metadata', 'UI_metadata'),
      ('mlpipeline_metrics', 'Metrics')
    ]):
    '''Divides two numbers and calculate  the quotient and remainder'''

    # Import the numpy package inside the component function
    import numpy as np

    # Define a helper function
    def divmod_helper(dividend, divisor):
        return np.divmod(dividend, divisor)

    (quotient, remainder) = divmod_helper(dividend, divisor)

    from tensorflow.python.lib.io import file_io
    import json

    # Export a sample tensorboard
    metadata = {
      'outputs' : [{
        'type': 'tensorboard',
        'source': 'gs://ml-pipeline-dataset/tensorboard-train',
      }]
    }

    # Export two metrics
    metrics = {
      'metrics': [{
          'name': 'quotient',
          'numberValue':  float(quotient),
        },{
          'name': 'remainder',
          'numberValue':  float(remainder),
        }]}

    from collections import namedtuple
    divmod_output = namedtuple('MyDivmodOutput',
        ['quotient', 'remainder', 'mlpipeline_ui_metadata',
         'mlpipeline_metrics'])
    return divmod_output(quotient, remainder, json.dumps(metadata),
                         json.dumps(metrics))
  1. Test your function by running it directly, or with unit tests.
my_divmod(100, 7)
  1. This should return a result like the following:

    MyDivmodOutput(quotient=14, remainder=2, mlpipeline_ui_metadata='{"outputs": [{"type": "tensorboard", "source": "gs://ml-pipeline-dataset/tensorboard-train"}]}', mlpipeline_metrics='{"metrics": [{"name": "quotient", "numberValue": 14.0}, {"name": "remainder", "numberValue": 2.0}]}')
    
  2. Use kfp.components.create_component_from_func to return a factory function that you can use to create kfp.dsl.ContainerOp class instances for your pipeline. This example also specifies the base container image to run this function in.

divmod_op = comp.create_component_from_func(
    my_divmod, base_image='tensorflow/tensorflow:1.11.0-py3')
  1. Define your pipeline. This example uses the divmod_op factory function and the add_op factory function from an earlier example.
import kfp.dsl as dsl
@dsl.pipeline(
   name='Calculation pipeline',
   description='An example pipeline that performs arithmetic calculations.'
)
def calc_pipeline(
   a='1',
   b='7',
   c='17',
):
    # Passes a pipeline parameter and a constant value as operation arguments.
    add_task = add_op(a, 4) # The add_op factory function returns
                            # a dsl.ContainerOp class instance. 

    # Passes the output of the add_task and a pipeline parameter as operation
    # arguments. For an operation with a single return value, the output
    # reference is accessed using `task.output` or
    # `task.outputs['output_name']`.
    divmod_task = divmod_op(add_task.output, b)

    # For an operation with multiple return values, output references are
    # accessed as `task.outputs['output_name']`.
    result_task = add_op(divmod_task.outputs['quotient'], c)
  1. Compile and run your pipeline. Learn more about compiling and running pipelines.
# Specify pipeline argument values
arguments = {'a': '7', 'b': '8'}

# Submit a pipeline run
client.create_run_from_pipeline_func(calc_pipeline, arguments=arguments)

Feedback

Was this page helpful?