Author Tasks with Platform-Specific Functionality

One of the benefits of KFP is cross-platform portability. The KFP SDK compiles pipeline definitions to IR YAML which can be read and executed by different backends, including the Kubeflow Pipelines open source backend and Vertex AI Pipelines.

For cases where features are not portable across platforms, users may author pipelines with platform-specific functionality via KFP SDK platform-specific plugin libraries.

In general, platform-specific plugin libraries provide functions that act on tasks similarly to task-level configuration methods provided by the KFP SDK directly. Platform-specific plugin libraries may also provide pre-baked components.

Example: Read/write to a Kubernetes PVC using kfp-kubernetes

Currently the only KFP SDK platform-specific plugin library is kfp-kubernetes, which is supported by the Kubeflow Pipelines open source backend and enables direct access to some Kubernetes resources and functionality.

The following uses kfp-kubernetes to demonstrate typical usage of a plugin library. Specifically, we will use kfp-kubernetes to create a PersistentVolumeClaim (PVC), use the PVC to pass data between tasks, and delete the PVC after using it. See the kfp-kubernetes documentation for more information.

The following assumes basic familiarity with PersistentVolume and PersistentVolumeClaim concepts in Kubernetes, authoring components, and authoring pipelines.

Step 1: Install the platform-specific plugin library with the KFP SDK

pip install kfp[kubernetes]

Step 2: Create components that read/write to the mount path

Create two simple components that read and write to a file. In a later step, we will mount the associated volume to the /data directory.

from kfp import dsl

@dsl.component
def producer() -> str:
    with open('/data/file.txt', 'w') as file:
        file.write('Hello world')
    with open('/data/file.txt', 'r') as file:
        content = file.read()
    print(content)
    return content

@dsl.component
def consumer() -> str:
    with open('/data/file.txt', 'r') as file:
        content = file.read()
    print(content)
    return content

Step 3: Dynamically provision a PVC using CreatePVC

Now that we have our components, we can begin constructing a pipeline. First, we need a PVC to mount. We’ll use the kubernetes.CreatePVC pre-baked component to dynamically provision a PVC.

@dsl.pipeline
def my_pipeline():
    pvc1 = kubernetes.CreatePVC(
        # can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
        pvc_name_suffix='-my-pvc',
        access_modes=['ReadWriteMany'],
        size='5Gi',
        storage_class_name='standard',
    )

This component provisions a 5GB PVC from the StorageClass 'standard' with the ReadWriteMany access mode. The PVC will be named after the underlying Argo workflow that creates it, concatenated with the suffix -my-pvc. The CreatePVC component returns this name as the output 'name'.

Step 4: Read and write data to the PVC

Next, we’ll use the mount_pvc task modifier with the producer and consumer components. We’ll also schedule task2 to run after task1 to prevent the components from writing and reading to the PVC at the same time.

    # write to the PVC
    task1 = producer()
    kubernetes.mount_pvc(
        task1,
        pvc_name=pvc1.outputs['name'],
        mount_path='/data',
    )

    # read to the PVC
    task2 = consumer()
    kubernetes.mount_pvc(
        task2,
        pvc_name=pvc1.outputs['name'],
        mount_path='/reused_data',
    )
    task2.after(task1)

Step 5: Delete the PVC

Finally, we can schedule deletion of the PVC after task2 finishes to clean up the Kubernetes resources we created.

    delete_pvc1 = kubernetes.DeletePVC(
        pvc_name=pvc1.outputs['name']).after(task2)

For the full pipeline and more information, see a similar example in the kfp-kubernetes documentation.

Feedback

Was this page helpful?


Last modified June 11, 2024: Fixed broken links (9665cfe8)