Create pipelines with control flow
Although a KFP pipeline decorated with the @dsl.pipeline
decorator looks like a normal Python function, it is actually an expression of pipeline topology and control flow semantics, constructed using the KFP domain-specific language (DSL). Pipeline Basics covered how data passing expresses pipeline topology through task dependencies. This section describes how to use control flow in your pipelines using the KFP DSL. The DSL features three types of control flow, each implemented by a Python context manager:
- Conditions
- Looping
- Exit handling
Conditions (dsl.If, dsl.Elif, dsl.Else)
The dsl.If
context manager enables conditional execution of tasks within its scope based on the output of an upstream task or pipeline input parameter. The context manager takes two arguments: a required condition
and an optional name
. The condition
is a comparative expression where at least one of the two operands is an output from an upstream task or a pipeline input parameter.
In the following pipeline, conditional_task
only executes if coin_flip_task
has the output 'heads'
.
from kfp import dsl
@dsl.pipeline
def my_pipeline():
coin_flip_task = flip_coin()
with dsl.If(coin_flip_task.output == 'heads'):
conditional_task = my_comp()
You may also use dsl.Elif
and dsl.Else
context managers immediately downstream of dsl.If
for additional conditional control flow functionality:
from kfp import dsl
@dsl.pipeline
def my_pipeline():
coin_flip_task = flip_three_sided_coin()
with dsl.If(coin_flip_task.output == 'heads'):
print_comp(text='Got heads!')
with dsl.Elif(coin_flip_task.output == 'tails'):
print_comp(text='Got tails!')
with dsl.Else():
print_comp(text='Draw!')
Deprecated
dsl.Condition is deprecated in favor of the functionally identical dsl.If, which is concise, Pythonic, and consistent with the dsl.Elif and dsl.Else objects.dsl.OneOf
dsl.OneOf
can be used to gather outputs from mutually exclusive branches into a single task output which can be consumed by a downstream task or outputted from a pipeline. Branches are mutually exclusive if exactly one will be executed. To enforce this, the KFP SDK compiler requires dsl.OneOf
consume from taksks within a logically associated group of conditional branches and that one of the branches is a dsl.Else
branch.
from kfp import dsl
@dsl.pipeline
def my_pipeline() -> str:
coin_flip_task = flip_three_sided_coin()
with dsl.If(coin_flip_task.output == 'heads'):
t1 = print_and_return(text='Got heads!')
with dsl.Elif(coin_flip_task.output == 'tails'):
t2 = print_and_return(text='Got tails!')
with dsl.Else():
t3 = print_and_return(text='Draw!')
oneof = dsl.OneOf(t1.output, t2.output, t3.output)
announce_result(oneof)
return oneof
You should provide task outputs to the dsl.OneOf
using .output
or .outputs[<key>]
, just as you would pass an output to a downstream task. The outputs provided to dsl.OneOf
must be of the same type and cannot be other instances of dsl.OneOf
or dsl.Collected
.
Not yet supported
dsl.OneOf
is not yet supported by the KFP orchestration backend, but may be supported by other orchestration backends.
Parallel looping (dsl.ParallelFor)
The dsl.ParallelFor
context manager allows parallel execution of tasks over a static set of items. The context manager takes three arguments: a required items
, an optional parallelism
, and an optional name
. items
is the static set of items to loop over and parallelism
is the maximum number of concurrent iterations permitted while executing the dsl.ParallelFor
group. parallelism=0
indicates unconstrained parallelism.
In the following pipeline, train_model
will train a model for 1, 5, 10, and 25 epochs, with no more than two training tasks running at one time:
from kfp import dsl
@dsl.pipeline
def my_pipeline():
with dsl.ParallelFor(
items=[1, 5, 10, 25],
parallelism=2
) as epochs:
train_model(epochs=epochs)
Not yet supported
Setting parallelism
is not yet supported by the KFP orchestration backend, but may be supported by other orchestration backends. You can track support for this feature via the GitHub issue.
dsl.Collected
Use dsl.Collected
with dsl.ParallelFor
to gather outputs from a parallel loop of tasks:
from kfp import dsl
@dsl.pipeline
def my_pipeline():
with dsl.ParallelFor(
items=[1, 5, 10, 25],
) as epochs:
train_model_task = train_model(epochs=epochs)
max_accuracy(models=dsl.Collected(train_model_task.outputs['model']))
Downstream tasks might consume dsl.Collected
outputs via an input annotated with a List
of parameters or a List
of artifacts. For example, select_best
in the preceding example has the input models
with type Input[List[Model]]
, as shown by the following component definition:
from kfp import dsl
from kfp.dsl import Model, Input
@dsl.component
def select_best(models: Input[List[Model]]) -> float:
return max(score_model(model) for model in models)
You can use dsl.Collected
to collect outputs from nested loops in a nested list of parameters. For example, output parameters from two nested dsl.ParallelFor
groups are collected in a multilevel nested list of parameters, where each nested list contains the output parameters from one of the dsl.ParallelFor
groups. The number of nested levels is based on the number of nested dsl.ParallelFor
contexts.
By comparison, artifacts created in nested loops are collected in a flat list.
You can also return a dsl.Collected
from a pipeline. Use a List
of parameters or a List
of artifacts in the return annotation, as shown in the following example:
from kfp import dsl
from kfp.dsl import Model
@dsl.pipeline
def my_pipeline() -> List[Model]:
with dsl.ParallelFor(
items=[1, 5, 10, 25],
) as epochs:
train_model_task = train_model(epochs=epochs)
return dsl.Collected(train_model_task.outputs['model'])
Not yet supported
dsl.Collected
is not yet supported by the KFP orchestration backend, but may be supported by other orchestration backends. You can track support for this feature via the GitHub issue.
Exit handling (dsl.ExitHandler)
The dsl.ExitHandler
context manager allows pipeline authors to specify an exit task which will run after the tasks within the context manager’s scope finish execution, even if one of those tasks fails. This is analogous to using a try:
block followed by a finally:
block in normal Python, where the exit task is in the finally:
block. The context manager takes two arguments: a required exit_task
and an optional name
. exit_task
accepts an instantiated PipelineTask
.
In the following pipeline, clean_up_task
will execute after both create_dataset
and train_and_save_models
finish or either of them fail:
from kfp import dsl
@dsl.pipeline
def my_pipeline():
clean_up_task = clean_up_resources()
with dsl.ExitHandler(exit_task=clean_up_task):
dataset_task = create_datasets()
train_task = train_and_save_models(dataset=dataset_task.output)
The task you use as an exit task may use a special input that provides access to pipeline and task status metadata, including pipeline failure or success status. You can use this special input by annotating your exit task with the dsl.PipelineTaskFinalStatus
annotation. The argument for this parameter will be provided by the backend automatically at runtime. You should not provide any input to this annotation when you instantiate your exit task.
The following pipeline uses dsl.PipelineTaskFinalStatus
to obtain information about the pipeline and task failure, even after fail_op
fails:
from kfp import dsl
from kfp.dsl import PipelineTaskFinalStatus
@dsl.component
def exit_op(user_input: str, status: PipelineTaskFinalStatus):
"""Prints pipeline run status."""
print(user_input)
print('Pipeline status: ', status.state)
print('Job resource name: ', status.pipeline_job_resource_name)
print('Pipeline task name: ', status.pipeline_task_name)
print('Error code: ', status.error_code)
print('Error message: ', status.error_message)
@dsl.component
def fail_op():
import sys
sys.exit(1)
@dsl.pipeline
def my_pipeline():
print_op()
print_status_task = exit_op(user_input='Task execution status:')
with dsl.ExitHandler(exit_task=print_status_task):
fail_op()
Ignore upstream failure
The .ignore_upstream_failure()
task method on PipelineTask
enables another approach to author pipelines with exit handling behavior. Calling this method on a task causes the task to ignore failures of any specified upstream tasks (as established by data exchange or by use of .after()
). If the task has no upstream tasks, this method has no effect.
In the following pipeline definition, clean_up_task
is executed after fail_op
, regardless of whether fail_op
succeeds:
from kfp import dsl
@dsl.pipeline()
def my_pipeline(text: str = 'message'):
task = fail_op(message=text)
clean_up_task = print_op(
message=task.output).ignore_upstream_failure()
Note that the component used for the caller task (print_op
in the example above) requires a default value for all inputs it consumes from an upstream task. The default value is applied if the upstream task fails to produce the outputs that are passed to the caller task. Specifying default values ensures that the caller task always succeeds, regardless of the status of the upstream task.
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.