Author Tasks with Platform-Specific Functionality
One of the benefits of KFP is cross-platform portability. The KFP SDK compiles pipeline definitions to IR YAML which can be read and executed by different backends, including the Kubeflow Pipelines open source backend and Vertex AI Pipelines.
For cases where features are not portable across platforms, users may author pipelines with platform-specific functionality via KFP SDK platform-specific plugin libraries.
In general, platform-specific plugin libraries provide functions that act on tasks similarly to task-level configuration methods provided by the KFP SDK directly. Platform-specific plugin libraries may also provide pre-baked components.
Example: Read/write to a Kubernetes PVC using kfp-kubernetes
Currently the only KFP SDK platform-specific plugin library is kfp-kubernetes
, which is supported by the Kubeflow Pipelines open source backend and enables direct access to some Kubernetes resources and functionality.
The following uses kfp-kubernetes
to demonstrate typical usage of a plugin library. Specifically, we will use kfp-kubernetes
to create a PersistentVolumeClaim (PVC), use the PVC to pass data between tasks, and delete the PVC after using it. See the kfp-kubernetes
documentation for more information.
The following assumes basic familiarity with PersistentVolume and PersistentVolumeClaim concepts in Kubernetes, authoring components, and authoring pipelines.
Step 1: Install the platform-specific plugin library with the KFP SDK
pip install kfp[kubernetes]
Step 2: Create components that read/write to the mount path
Create two simple components that read and write to a file. In a later step, we will mount the associated volume to the /data
directory.
from kfp import dsl
@dsl.component
def producer() -> str:
with open('/data/file.txt', 'w') as file:
file.write('Hello world')
with open('/data/file.txt', 'r') as file:
content = file.read()
print(content)
return content
@dsl.component
def consumer() -> str:
with open('/data/file.txt', 'r') as file:
content = file.read()
print(content)
return content
Step 3: Dynamically provision a PVC using CreatePVC
Now that we have our components, we can begin constructing a pipeline. First, we need a PVC to mount. We’ll use the kubernetes.CreatePVC
pre-baked component to dynamically provision a PVC.
@dsl.pipeline
def my_pipeline():
pvc1 = kubernetes.CreatePVC(
# can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
pvc_name_suffix='-my-pvc',
access_modes=['ReadWriteMany'],
size='5Gi',
storage_class_name='standard',
)
This component provisions a 5GB PVC from the StorageClass 'standard'
with the ReadWriteMany
access mode. The PVC will be named after the underlying Argo workflow that creates it, concatenated with the suffix -my-pvc
. The CreatePVC
component returns this name as the output 'name'
.
Step 4: Read and write data to the PVC
Next, we’ll use the mount_pvc
task modifier with the producer
and consumer
components. We’ll also schedule task2
to run after task1
to prevent the components from writing and reading to the PVC at the same time.
# write to the PVC
task1 = producer()
kubernetes.mount_pvc(
task1,
pvc_name=pvc1.outputs['name'],
mount_path='/data',
)
# read to the PVC
task2 = consumer()
kubernetes.mount_pvc(
task2,
pvc_name=pvc1.outputs['name'],
mount_path='/reused_data',
)
task2.after(task1)
Step 5: Delete the PVC
Finally, we can schedule deletion of the PVC after task2
finishes to clean up the Kubernetes resources we created.
delete_pvc1 = kubernetes.DeletePVC(
pvc_name=pvc1.outputs['name']).after(task2)
For the full pipeline and more information, see a similar example in the kfp-kubernetes
documentation.
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.