Skip to content

Commit

Permalink
Added kfp_ray
Browse files Browse the repository at this point in the history
Signed-off-by: Yash Kalathiya <[email protected]>
  • Loading branch information
ykalathiya committed Jul 1, 2024
1 parent 51d4cea commit 0f9651a
Show file tree
Hide file tree
Showing 16 changed files with 307 additions and 19 deletions.
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/createRayClusterComponent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ inputs:

implementation:
container:
image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0.dev6"
image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.1.dev0"
# command is a list of strings (command-line arguments).
# The YAML language has two syntaxes for lists and you can use either of them.
# Here we use the "flow syntax" - comma-separated strings inside square brackets.
Expand Down
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/deleteRayClusterComponent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ inputs:

implementation:
container:
image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0.dev6"
image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.1.dev0"
# command is a list of strings (command-line arguments).
# The YAML language has two syntaxes for lists and you can use either of them.
# Here we use the "flow syntax" - comma-separated strings inside square brackets.
Expand Down
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/executeRayJobComponent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ inputs:

implementation:
container:
image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0.dev6"
image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.1.dev0"
# command is a list of strings (command-line arguments).
# The YAML language has two syntaxes for lists and you can use either of them.
# Here we use the "flow syntax" - comma-separated strings inside square brackets.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ inputs:

implementation:
container:
image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0.dev6"
image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.1.dev0"
# command is a list of strings (command-line arguments).
# The YAML language has two syntaxes for lists and you can use either of them.
# Here we use the "flow syntax" - comma-separated strings inside square brackets.
Expand Down
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/executeSubWorkflowComponent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ outputs:

implementation:
container:
image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0.dev6"
image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.1.dev0"
# command is a list of strings (command-line arguments).
# The YAML language has two syntaxes for lists, and you can use either of them.
# Here we use the "flow syntax" - comma-separated strings inside square brackets.
Expand Down
4 changes: 3 additions & 1 deletion transforms/code/header_cleanser/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ the following runtimes are available:
* [python](python/README.md) - provides the base python-based transformation
implementation.
* [ray](ray/README.md) - enables the running of the base python transformation
in a Ray runtime
in a Ray runtime.
* [kfp_ray](kfp_ray/README.md) - enables running the ray docker image
in a kubernetes cluster using a generated `yaml` file.
48 changes: 48 additions & 0 deletions transforms/code/header_cleanser/kfp_ray/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
REPOROOT=${CURDIR}/../../../../
WORKFLOW_VENV_ACTIVATE=${REPOROOT}/transforms/venv/bin/activate
include $(REPOROOT)/transforms/.make.workflows

SRC_DIR=${CURDIR}/../ray/

PYTHON_WF := $(shell find ./ -name '*_wf.py')
YAML_WF := $(patsubst %.py, %.yaml, ${PYTHON_WF})

workflow-venv: .check_python_version ${WORKFLOW_VENV_ACTIVATE}

.PHONY: clean
clean:
@# Help: Clean up the virtual environment.
rm -rf ${REPOROOT}/transforms/venv

venv::

build::

setup::

test::

test-src::

test-image::

publish::

image::

load-image::


.PHONY: workflow-build
workflow-build: workflow-venv
$(MAKE) $(YAML_WF)

.PHONY: workflow-test
workflow-test: workflow-build
$(MAKE) .workflows.test-pipeline TRANSFORM_SRC=${SRC_DIR} PIPELINE_FILE=header_cleanser_wf.yaml

.PHONY: workflow-upload
workflow-upload: workflow-build
@for file in $(YAML_WF); do \
$(MAKE) .workflows.upload-pipeline PIPELINE_FILE=$$file; \
done
28 changes: 28 additions & 0 deletions transforms/code/header_cleanser/kfp_ray/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Header Cleanser Ray-base KubeFlow Pipeline Transformation


## Summary
This project allows execution of the [noop Ray transform](../ray) as a
[KubeFlow Pipeline](https://www.kubeflow.org/docs/components/pipelines/overview/)

The detail pipeline is presented in the [Simplest Transform pipeline tutorial](../../../../kfp/doc/simple_transform_pipeline.md)

## Compilation

In order to compile pipeline definitions run
```shell
make workflow-build
```
from the directory. It creates a virtual environment (make workflow-venv) and after that compiles the pipeline
definitions in the folder. The virtual environment is created once for all transformers.

Note: the pipelines definitions can be compiled and executed on KFPv1 and KFPv2. Meantime, KFPv1 is our default. If you
prefer KFPv2, please do the following:
```shell
make clean
export KFPv2=1
make workflow-build
```

The next steps are described in [Deploying a pipeline](../../../../kfp/doc/simple_transform_pipeline.md#deploying-a-pipeline-)
and [Executing pipeline and watching execution results](../../../../kfp/doc/simple_transform_pipeline.md#executing-pipeline-and-watching-execution-results-)
207 changes: 207 additions & 0 deletions transforms/code/header_cleanser/kfp_ray/header_cleanser_wf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import os

import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils


# the name of the job script
EXEC_SCRIPT_NAME: str = "header_cleanser_transform_ray.py"
PREFIX: str = ""

task_image = "quay.io/dataprep1/data-prep-kit/header-cleanser-ray:0.2.1.dev0"

# components
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.1.dev0"

# path to kfp component specifications files
component_spec_path = "../../../../kfp/kfp_ray_components/"

# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
def compute_exec_params_func(
worker_options: str,
actor_options: str,
data_s3_config: str,
data_max_files: int,
data_num_samples: int,
runtime_pipeline_id: str,
runtime_job_id: str,
runtime_code_location: str,
) -> dict:
from runtime_utils import KFPUtils

return {
"data_s3_config": data_s3_config,
"data_max_files": data_max_files,
"data_num_samples": data_num_samples,
"runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options),
"runtime_worker_options": actor_options,
"runtime_pipeline_id": runtime_pipeline_id,
"runtime_job_id": runtime_job_id,
"runtime_code_location": runtime_code_location,
}


# KFPv1 and KFP2 uses different methods to create a component from a function. KFPv1 uses the
# `create_component_from_func` function, but it is deprecated by KFPv2 and so has a different import path.
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use
# this if/else statement and explicitly call the decorator.
if os.getenv("KFPv2", "0") == "1":
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at
# compilation time.
import uuid

compute_exec_params_op = dsl.component_decorator.component(
func=compute_exec_params_func, base_image=base_kfp_image
)
print(
"WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
+ "same version of the same pipeline !!!"
)
run_id = uuid.uuid4().hex
else:
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image)
run_id = dsl.RUN_ID_PLACEHOLDER


# create Ray cluster
create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml")
# execute job
execute_ray_jobs_op = comp.load_component_from_file(component_spec_path + "executeRayJobComponent.yaml")
# clean up Ray
cleanup_ray_op = comp.load_component_from_file(component_spec_path + "deleteRayClusterComponent.yaml")

# Task name is part of the pipeline name, the ray cluster name and the job name in DMF.
TASK_NAME: str = "header_cleanser"


# Pipeline to invoke execution on remote resource
@dsl.pipeline(
name=TASK_NAME + "-ray-pipeline",
description="Pipeline for header cleaning task",
)
def header_cleanser(
# Ray cluster
ray_name: str = "header_cleanser-kfp-ray", # name of Ray cluster
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: str = '{"cpu": 4, "memory": 4, "image": "' + task_image + '" }',
ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, \
"image": "'
+ task_image
+ '" }',
server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
# data access
data_s3_config: str = "{'input_folder': 'test/header_cleanser/input/', 'output_folder': 'test/header_cleanser/output/'}",
data_s3_access_secret: str = "s3-secret",
data_max_files: int = -1,
data_num_samples: int = -1,
# orchestrator
runtime_actor_options: str = "{'num_cpus': 0.8}",
runtime_pipeline_id: str = "runtime_pipeline_id",
runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}",
# code quality parameters
contents_column_name: str = "contents",
license: str = "true",
copyright: str = "true",
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 800, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
):
"""
Pipeline to execute Header Cleanser transform
:param ray_name: name of the Ray cluster
:param ray_head_options: head node options, containing the following:
cpu - number of cpus
memory - memory
image - image to use
image_pull_secret - image pull secret
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
min_replicas - min number of replicas
cpu - number of cpus
memory - memory
image - image to use
image_pull_secret - image pull secret
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
wait_cluster_ready_tmout - time to wait for cluster ready, sec
wait_cluster_up_tmout - time to wait for cluster up, sec
wait_job_ready_tmout - time to wait for job ready, sec
wait_print_tmout - time between prints, sec
http_retries - http retries for API server calls
:param data_s3_access_secret - s3 access secret
:param data_s3_config - s3 configuration
:param data_max_files - max files to process
:param data_num_samples - num samples to process
:param runtime_actor_options - actor options
:param runtime_pipeline_id - pipeline id
:param contents_column_name - Name of the column holds the data to process
:param license - Hold value true or false to delete/remove license or not.
:param copyright - Hold value true or false to delete/remove copyright or not.
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
compute_exec_params = compute_exec_params_op(
worker_options=ray_worker_options,
actor_options=runtime_actor_options,
data_s3_config=data_s3_config,
data_max_files=data_max_files,
data_num_samples=data_num_samples,
runtime_pipeline_id=runtime_pipeline_id,
runtime_job_id=run_id,
runtime_code_location=runtime_code_location,
)

ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2)
# start Ray cluster
ray_cluster = create_ray_op(
ray_name=ray_name,
run_id=run_id,
ray_head_options=ray_head_options,
ray_worker_options=ray_worker_options,
server_url=server_url,
additional_params=additional_params,
)
ComponentUtils.add_settings_to_component(ray_cluster, ONE_HOUR_SEC * 2)
ray_cluster.after(compute_exec_params)

# Execute job
execute_job = execute_ray_jobs_op(
ray_name=ray_name,
run_id=run_id,
additional_params=additional_params,
# note that the parameters below are specific for NOOP transform
exec_params=compute_exec_params.output,
exec_script_name=EXEC_SCRIPT_NAME,
server_url=server_url,
)
ComponentUtils.add_settings_to_component(execute_job, ONE_WEEK_SEC)
ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret)

execute_job.after(ray_cluster)


if __name__ == "__main__":
# Compiling the pipeline
compiler.Compiler().compile(header_cleanser, __file__.replace(".py", ".yaml"))
3 changes: 2 additions & 1 deletion transforms/code/header_cleanser/python/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ publish-image:: .transforms.publish-image-python
setup:: .transforms.setup

# distribution versions is the same as image version.
$(MAKE) TRANSFORM_PYTHON_VERSION=${HEADER_CLEANSER_PYTHON_VERSION} TOML_VERSION=$(DOCKER_IMAGE_VERSION) .transforms.set-versions
set-versions:
$(MAKE) TOML_VERSION=$(DOCKER_IMAGE_VERSION) .defaults.update-toml

build-dist:: set-versions .defaults.build-dist

Expand Down
9 changes: 6 additions & 3 deletions transforms/code/header_cleanser/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ After locating the position of license or copyright in the input code/sample, th

The set of dictionary keys holding configuration for values are as follows:

* --header_cleanser_contents_column_name - specifies the column name which holds code content. By default the value is 'contents'.
* --header_cleanser_license - specifies the bool value for removing license or not. Write true or false. Default value is True.
* --header_cleanser_copyright - specifies the bool value for removing copyright or not. Write true or false. Default value is True.
* contents_column_name - used to define input column name. Default value is 'contents'.
* license - write 'true' to remove license from input data else 'false'. By default set as 'true'.
* copyright - write 'true' to remove copyright from input data else 'false'. by default set as 'true'.

## Running
You can run the [header_cleanser_local.py](src/header_cleanser_local.py) (python-only implementation) or [header_cleanser_local_ray.py](ray/src/header_cleanser_local_ray.py) (ray-based implementation) to transform the `test1.parquet` file in [test input data](test-data/input) to an `output` directory. The directory will contain both the new annotated `test1.parquet` file and the `metadata.json` file.
Expand All @@ -27,6 +27,9 @@ You can run the [header_cleanser_local.py](src/header_cleanser_local.py) (python
When running the transform with the Ray launcher (i.e. TransformLauncher),
the following command line arguments are available in addition to
the [python launcher](../../../../data-processing-lib/doc/python-launcher-options.md).
* --header_cleanser_contents_column_name - set the contents_column_name configuration key.
* --header_cleanser_license - set the license configuration key.
* --header_cleanser_copyright - set the copyright configuration key.

### Running the samples
To run the samples, use the following `make` targets
Expand Down
4 changes: 2 additions & 2 deletions transforms/code/header_cleanser/python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "dpk_header_cleanser_transform_python"
version = "0.2.0.dev6"
version = "0.2.1.dev0"
requires-python = ">=3.10"
description = "License and Copyright Removal Transform for Python"
license = {text = "Apache-2.0"}
Expand All @@ -9,7 +9,7 @@ authors = [
{ name = "Yash kalathiya", email = "[email protected]" },
]
dependencies = [
"data-prep-toolkit==0.2.0.dev6",
"data-prep-toolkit==0.2.1.dev0",
"scancode-toolkit==32.1.0",
]

Expand Down
Loading

0 comments on commit 0f9651a

Please sign in to comment.