# Copyright 2026 The Kubeflow Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Compile a Kale :class:`~kale.pipeline.Pipeline` into a Kubeflow Pipelines v2 DSL script.
This module renders the Jinja2 templates in ``kale/templates/`` to produce a
ready-to-run KFP v2 pipeline script, formats it, and optionally hands it off to
the KFP SDK for compilation and submission.
"""
import argparse
import logging
import os
import re
from typing import NamedTuple
import autopep8
from jinja2 import Environment, FileSystemLoader, PackageLoader
from kale import __version__ as KALE_VERSION
from kale.common import graphutils, kfputils, utils
from kale.common.imports import get_packages_to_install
from kale.pipeline import DEFAULT_BASE_IMAGE, Pipeline, PipelineParam, Step
log = logging.getLogger(__name__)
NB_FN_TEMPLATE = "nb_function_template.jinja2"
PIPELINE_TEMPLATE = "pipeline_template.jinja2"
KFP_DSL_ARTIFACT_IMPORTS = [
"Dataset",
"Model",
"Metrics",
"ClassificationMetrics",
"Artifact",
"HTML",
]
[docs]
class Artifact(NamedTuple):
"""A Step artifact."""
name: str
type: str
is_input: bool = False
[docs]
class Compiler:
"""Converts a Pipeline object into a KFP executable.
Compiler provides the tools to convert a Pipeline object into an
executable script that uses the KFP DSL to create and upload a
new pipeline.
The Pipeline object is assumed to provide all the necessary information
(environment, configuration, etc...) for the script to be compiled.
"""
def __init__(self, pipeline: Pipeline, imports_and_functions: str):
self.pipeline = pipeline
self.templating_env = None
self.dsl_source = ""
self.dsl_script_path = None
self.imports_and_functions = imports_and_functions
@staticmethod
def _get_args():
parser = argparse.ArgumentParser(description="Run Kale Pipeline")
parser.add_argument("-K", "--kfp", action="store_true")
return parser.parse_args()
[docs]
def compile_and_run(self):
"""First compile the Pipeline to DSL and then run it."""
self.compile()
self.run()
[docs]
def compile(self):
"""Convert Pipeline to KFP DSL.
Returns path to DSL script.
"""
log.info("Compiling Pipeline into KFP DSL code")
self.dsl_source = self.generate_dsl()
return self._save_compiled_code()
[docs]
def run(self):
"""Run the generated KFP script."""
if not self.dsl_script_path:
raise RuntimeError(
"The Compiler has yet to generate a new KFP"
" DSL script. Please run the `compile` function"
" first."
)
self._run_compiled_code(self.dsl_script_path)
[docs]
def generate_dsl(self):
"""Generate a Python KFP DSL executable starting from the pipeline.
Returns (str): A Python executable script
"""
# Fail early if there are no steps in the pipeline.
if not hasattr(self.pipeline, "steps") or not self.pipeline.steps:
raise ValueError("Task is missing from pipeline.")
# List of lightweight components generated code
lightweight_components = [
self.generate_lightweight_component(step) for step in self.pipeline.steps
]
pipeline_code = self.generate_pipeline(lightweight_components)
return pipeline_code
[docs]
def generate_lightweight_component(self, step: Step):
"""Generate Python code using the notebook function template."""
step_source_raw = step.source
def _encode_source(s):
# Encode line by line a multiline string
return "\n ".join(
[line.encode("unicode_escape").decode("utf-8") for line in s.splitlines()]
)
# Since the code will be wrapped in triple quotes inside the
# template, we need to escape triple quotes as they will not be
# escaped by encode("unicode_escape").
step.source = [re.sub(r"'''", "\\'\\'\\'", _encode_source(s)) for s in step_source_raw]
template = self._get_templating_env().get_template(NB_FN_TEMPLATE)
# Separate parameters with and without defaults for proper ordering
params_without_defaults = [f"{step.name}_html_report: Output[HTML]"]
if hasattr(step, "metrics") and step.metrics:
params_without_defaults.append("kale_metrics_artifact: Output[Metrics]")
params_with_defaults = []
step_inputs_list, step_outputs_list = [], []
if hasattr(step, "ins") and step.ins:
step_inputs_list = sorted(step.ins)
for var_name in step_inputs_list:
# Determine the correct input type based on variable name
input_type = "Model" if "model" in var_name else "Dataset"
params_without_defaults.append(f"{var_name}_input_artifact: Input[{input_type}]")
step_outputs_list = []
if hasattr(step, "outs") and step.outs:
step_outputs_list = sorted(step.outs)
for var_name in step_outputs_list:
output_type = "Model" if "model" in var_name else "Dataset"
params_without_defaults.append(f"{var_name}_output_artifact: Output[{output_type}]")
if hasattr(self.pipeline, "pipeline_parameters") and self.pipeline.pipeline_parameters: # noqa: E501
for param_name, param in self.pipeline.pipeline_parameters.items():
if isinstance(param, PipelineParam):
param_type = param.param_type or "str"
param_value_str = repr(param.param_value)
clean_param_name = (
f"{param_name.lower()}_param" if param_name.isupper() else param_name
)
params_with_defaults.append(
f"{clean_param_name}: {param_type} = {param_value_str}"
)
component_params_list = params_without_defaults + params_with_defaults
component_signature_args = ", ".join(component_params_list)
# Create pipeline parameter mapping for the template
pipeline_params = {}
if hasattr(self.pipeline, "pipeline_parameters") and self.pipeline.pipeline_parameters: # noqa: E501
for param_name, param in self.pipeline.pipeline_parameters.items():
if isinstance(param, PipelineParam):
clean_param_name = (
f"{param_name.lower()}_param" if param_name.isupper() else param_name
)
param = {clean_param_name: param.param_value}
pipeline_params[param_name] = param
# Create step artifacts info for template
step_inputs = []
step_outputs = []
for var_name in step_inputs_list:
input_type = "Model" if "model" in var_name else "Dataset"
step_inputs.append(Artifact(name=f"{var_name}", type=input_type, is_input=True))
for var_name in step_outputs_list:
output_type = "Model" if "model" in var_name else "Dataset"
step_outputs.append(Artifact(name=f"{var_name}", type=output_type, is_input=False))
packages_list = self._get_package_list_from_imports()
pip_index_urls = utils.compute_pip_index_urls()
pip_trusted_hosts = utils.compute_trusted_hosts()
fn_code = template.render(
pip_index_urls=pip_index_urls,
pip_trusted_hosts=pip_trusted_hosts,
step=step,
component_signature_args=component_signature_args,
pipeline_params=pipeline_params,
packages_list=packages_list,
step_inputs=step_inputs,
step_outputs=step_outputs,
kfp_dsl_artifact_imports=KFP_DSL_ARTIFACT_IMPORTS,
default_base_image=DEFAULT_BASE_IMAGE,
**self.pipeline.config.to_dict(),
)
return autopep8.fix_code(fn_code)
[docs]
def generate_pipeline(self, lightweight_components):
"""Generate Python code using the pipeline template."""
template = self._get_templating_env().get_template(PIPELINE_TEMPLATE)
step_outputs = {}
step_inputs = {}
step_inputs_sources = {}
for step in self.pipeline.steps:
if hasattr(step, "ins") and step.ins:
step_inputs[step.name] = sorted(step.ins)
step_inputs_sources[step.name] = {}
ancestors = graphutils.get_ordered_ancestors(self.pipeline, step.name)
for input_var in step_inputs[step.name]:
source_step_name = "UNKNOWN"
for anc_name in ancestors:
anc_step = self.pipeline.get_step(anc_name)
if hasattr(anc_step, "outs") and input_var in anc_step.outs:
source_step_name = anc_name
break
step_inputs_sources[step.name][input_var] = source_step_name
if hasattr(step, "outs") and step.outs:
step_outputs[step.name] = sorted(step.outs)
pipeline_param_info = {}
if hasattr(self.pipeline, "pipeline_parameters") and self.pipeline.pipeline_parameters: # noqa: E501
for param_name, param in self.pipeline.pipeline_parameters.items():
if isinstance(param, PipelineParam):
clean_param_name = (
f"{param_name.lower()}_param" if param_name.isupper() else param_name
)
pipeline_param_info[param_name] = {
"clean_name": clean_param_name,
"type": param.param_type,
"default": param.param_value,
}
if hasattr(self.pipeline, "steps") and self.pipeline.steps:
# Ensure that the first step is always the pipeline entry point
component_names = {}
for step in self.pipeline.steps:
component_names[step.name] = step.name.replace("_", "-")
pipeline_code = template.render(
pipeline=self.pipeline,
lightweight_components=lightweight_components,
step_outputs=step_outputs,
step_inputs=step_inputs,
step_inputs_sources=step_inputs_sources,
pipeline_param_info=pipeline_param_info,
component_names=component_names,
**self.pipeline.config.to_dict(),
)
# fix code style using pep8 guidelines
return autopep8.fix_code(pipeline_code)
def _get_package_list_from_imports(self):
"""Extract pip-installable package names from imports using AST.
Uses the imports module to parse Python import statements via AST
and resolve them to their corresponding PyPI package names. This
properly handles all import forms and filters out stdlib modules.
Returns:
A sorted list of unique PyPI package names to install.
"""
package_names = set()
# Always include kale and kfp as dependencies
if KALE_VERSION != "0+unknown":
package_names.add(f"kubeflow-kale=={KALE_VERSION}")
else:
package_names.add("kubeflow-kale")
package_names.add("kfp>=2.0.0")
# Parse imports using AST and resolve to PyPI package names
package_names.update(get_packages_to_install(self.imports_and_functions))
return sorted(package_names)
def _get_templating_env(self, templates_path=None):
if self.templating_env:
return self.templating_env
if templates_path:
loader = FileSystemLoader(templates_path)
else:
loader = PackageLoader("kale", "templates")
template_env = Environment(loader=loader)
# add custom filters
template_env.filters["add_suffix"] = lambda s, suffix: s + suffix
template_env.filters["add_prefix"] = lambda s, prefix: prefix + s
# quote a string when it is materialized in the template
template_env.filters["quote_if_not_none"] = lambda x: f'"{x}"' if x is not None else None
self.templating_env = template_env
return template_env
def _save_compiled_code(self, path: str = None) -> str:
if not path:
config_output_path = self.pipeline.config.output_path
if config_output_path:
# Resolve relative to CWD (the notebook's working directory)
path = os.path.join(os.getcwd(), config_output_path)
else:
# Default: save in hidden .kale/ directory
path = os.path.join(os.getcwd(), ".kale")
os.makedirs(path, exist_ok=True)
log.info("Saving generated code in %s", path)
filename = f"{self.pipeline.config.pipeline_name}.kale.py"
output_path = os.path.abspath(os.path.join(path, filename))
with open(output_path, "w") as f:
f.write(self.dsl_source)
log.info("Successfully saved generated code: %s", output_path)
self.dsl_script_path = output_path
return output_path
def _run_compiled_code(self, script_path: str):
pipeline_name = self.pipeline.config.pipeline_name
pipeline_yaml_path = kfputils.compile_pipeline(script_path, pipeline_name)
pipeline_id, version_id = kfputils.upload_pipeline(pipeline_yaml_path, pipeline_name)
kfputils.run_pipeline(
experiment_name=self.pipeline.config.experiment_name,
pipeline_id=pipeline_id,
version_id=version_id,
)