CloudsArk
Kubeflow Mlops

Deploying Kubeflow Pipelines on OpenShift AI

Build and deploy a reproducible training pipeline on OpenShift AI using Kubeflow Pipelines v2 — from component authoring to scheduled runs.

What Kubeflow Pipelines gives you

Kubeflow Pipelines (KFP) turns a Python function into a containerized, reproducible, schedulable workflow. Each component is a Docker container. A pipeline chains components into a directed acyclic graph (DAG). The entire run is recorded, versioned, and visible in the KFP UI.

On OpenShift AI (formerly RHOAI), KFP is available as a managed component — no separate installation required.

Install the KFP SDK

pip install kfp==2.7.0

Define a component

Components are Python functions decorated with @dsl.component:

from kfp import dsl
from kfp.dsl import Output, Dataset, Model, Metrics

@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["scikit-learn", "pandas"],
)
def preprocess_data(
    raw_data_path: str,
    processed_data: Output[Dataset],
) -> None:
    import pandas as pd
    from sklearn.preprocessing import StandardScaler

    df = pd.read_csv(raw_data_path)
    scaler = StandardScaler()
    df[["f1", "f2"]] = scaler.fit_transform(df[["f1", "f2"]])
    df.to_csv(processed_data.path, index=False)


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["scikit-learn", "pandas", "mlflow"],
)
def train_model(
    processed_data: Dataset,
    model_output: Output[Model],
    metrics: Output[Metrics],
    n_estimators: int = 100,
) -> None:
    import pandas as pd
    import pickle
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import train_test_split

    df = pd.read_csv(processed_data.path)
    X = df.drop("label", axis=1)
    y = df["label"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

    clf = RandomForestClassifier(n_estimators=n_estimators)
    clf.fit(X_train, y_train)
    accuracy = clf.score(X_test, y_test)

    metrics.log_metric("accuracy", accuracy)

    with open(model_output.path, "wb") as f:
        pickle.dump(clf, f)

Compose the pipeline

@dsl.pipeline(
    name="fraud-detection-training",
    description="Preprocess data and train a fraud detection model",
)
def fraud_pipeline(
    raw_data_path: str = "s3://my-bucket/data/raw.csv",
    n_estimators: int = 100,
):
    preprocess_task = preprocess_data(raw_data_path=raw_data_path)

    train_task = train_model(
        processed_data=preprocess_task.outputs["processed_data"],
        n_estimators=n_estimators,
    )
    train_task.after(preprocess_task)

Compile and upload

from kfp import compiler

compiler.Compiler().compile(
    pipeline_func=fraud_pipeline,
    package_path="fraud_pipeline.yaml",
)

Upload to the KFP UI or use the client:

import kfp

client = kfp.Client(host="https://ds-pipeline-dspa.apps.cluster.example.com")

pipeline = client.upload_pipeline(
    pipeline_package_path="fraud_pipeline.yaml",
    pipeline_name="fraud-detection-training",
)

# Trigger a run
run = client.create_run_from_pipeline_package(
    pipeline_file="fraud_pipeline.yaml",
    arguments={"n_estimators": 200},
    run_name="training-run-v1",
)

Schedule recurring runs

from datetime import datetime

client.create_recurring_run(
    pipeline_id=pipeline.pipeline_id,
    experiment_name="fraud-detection",
    job_name="weekly-retraining",
    cron_expression="0 2 * * 1",   # every Monday at 2am
    max_concurrency=1,
)

Monitor runs

# List recent runs via CLI
oc get pipelineruns -n my-project

# Stream logs from a component pod
oc logs -l pipeline/runid=<run-id> -n my-project -f

Key takeaways

  • Each @dsl.component becomes its own container — pin base_image for reproducibility.
  • Use KFP artifact types (Dataset, Model, Metrics) instead of raw paths for lineage tracking.
  • Compile to YAML, version it in git, and upload with the Python client or the UI.
  • Scheduled recurring runs handle retraining cadence without external cron jobs.