Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New transformer for license and copyright header removal #332

Open
wants to merge 14 commits into
base: dev
Choose a base branch
from
7 changes: 4 additions & 3 deletions .make.versions
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ CODE2PARQUET_PYTHON_VERSION=$(DPK_VERSION)
CODE2PARQUET_RAY_VERSION=$(DPK_VERSION)
INGEST_TO_PARQUET_VERSION=$(DPK_VERSION)

HEADER_CLEANSER_PYTHON_VERSION=$(DPK_VERSION)
HEADER_CLEANSER_RAY_VERSION=$(DPK_VERSION)

################## ################## ################## ################## ################## ##################
# Begin versions that the repo depends on.

Expand All @@ -94,6 +97,4 @@ ifeq ($(KFPv2), 1)
WORKFLOW_SUPPORT_LIB=kfp_v2_workflow_support
else
WORKFLOW_SUPPORT_LIB=kfp_v1_workflow_support
endif


endif
62 changes: 62 additions & 0 deletions transforms/code/header_cleanser/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
REPOROOT=../../..
# Use make help, to see the available rules
include $(REPOROOT)/.make.defaults

setup::
@# Help: Recursively make $@ all subdirs
$(MAKE) RULE=$@ .recurse

clean::
@# Help: Recursively make $@ all subdirs
$(MAKE) RULE=$@ .recurse

build::
@# Help: Recursively make $@ in subdirs
$(MAKE) RULE=$@ .recurse
venv::
@# Help: Recursively make $@ in subdirs
$(MAKE) RULE=$@ .recurse

image::
@# Help: Recursively make $@ in all subdirs
@$(MAKE) RULE=$@ .recurse

publish::
@# Help: Recursively make $@ in all subdirs
@$(MAKE) RULE=$@ .recurse

set-versions:
@# Help: Recursively $@ in all subdirs
@$(MAKE) RULE=$@ .recurse

test-image::
@# Help: Recursively make $@ in all subdirs
@$(MAKE) RULE=$@ .recurse

test::
@# Help: Recursively make $@ in all subdirs
@$(MAKE) RULE=$@ .recurse

test-src::
@# Help: Recursively make $@ in all subdirs
$(MAKE) RULE=$@ .recurse

load-image::
@# Help: Recursively make $@ in all subdirs
$(MAKE) RULE=$@ .recurse

.PHONY: workflow-venv
workflow-venv:
$(MAKE) -C kfp_ray workflow-venv

.PHONY: workflow-test
workflow-test:
$(MAKE) -C kfp_ray workflow-test

.PHONY: workflow-upload
workflow-upload:
$(MAKE) -C kfp_ray workflow-upload

.PHONY: workflow-build
workflow-build:
$(MAKE) -C kfp_ray workflow-build
13 changes: 13 additions & 0 deletions transforms/code/header_cleanser/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Header Cleanser Transform
The Header cleanser transforms
Detect and remove license and copyright of input data.
Per the set of
[transform project conventions](../../README.md#transform-project-conventions)
the following runtimes are available:

* [python](python/README.md) - provides the base python-based transformation
implementation.
* [ray](ray/README.md) - enables the running of the base python transformation
in a Ray runtime.
* [kfp_ray](kfp_ray/README.md) - enables running the ray docker image
in a kubernetes cluster using a generated `yaml` file.
48 changes: 48 additions & 0 deletions transforms/code/header_cleanser/kfp_ray/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
REPOROOT=${CURDIR}/../../../../
WORKFLOW_VENV_ACTIVATE=${REPOROOT}/transforms/venv/bin/activate
include $(REPOROOT)/transforms/.make.workflows

SRC_DIR=${CURDIR}/../ray/

PYTHON_WF := $(shell find ./ -name '*_wf.py')
YAML_WF := $(patsubst %.py, %.yaml, ${PYTHON_WF})

workflow-venv: .check_python_version ${WORKFLOW_VENV_ACTIVATE}

.PHONY: clean
clean:
@# Help: Clean up the virtual environment.
rm -rf ${REPOROOT}/transforms/venv

venv::

build::

setup::

test::

test-src::

test-image::

publish::

image::

load-image::


.PHONY: workflow-build
workflow-build: workflow-venv
$(MAKE) $(YAML_WF)

.PHONY: workflow-test
workflow-test: workflow-build
$(MAKE) .workflows.test-pipeline TRANSFORM_SRC=${SRC_DIR} PIPELINE_FILE=header_cleanser_wf.yaml

.PHONY: workflow-upload
workflow-upload: workflow-build
@for file in $(YAML_WF); do \
$(MAKE) .workflows.upload-pipeline PIPELINE_FILE=$$file; \
done
28 changes: 28 additions & 0 deletions transforms/code/header_cleanser/kfp_ray/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Header Cleanser Ray-base KubeFlow Pipeline Transformation


## Summary
This project allows execution of the [header cleanser Ray transform](../ray) as a
[KubeFlow Pipeline](https://www.kubeflow.org/docs/components/pipelines/overview/)
daw3rd marked this conversation as resolved.
Show resolved Hide resolved

The detail pipeline is presented in the [Simplest Transform pipeline tutorial](../../../../kfp/doc/simple_transform_pipeline.md)

## Compilation

In order to compile pipeline definitions run
```shell
make workflow-build
```
from the directory. It creates a virtual environment (make workflow-venv) and after that compiles the pipeline
definitions in the folder. The virtual environment is created once for all transformers.

Note: the pipelines definitions can be compiled and executed on KFPv1 and KFPv2. Meantime, KFPv1 is our default. If you
prefer KFPv2, please do the following:
```shell
make clean
export KFPv2=1
make workflow-build
```

The next steps are described in [Deploying a pipeline](../../../../kfp/doc/simple_transform_pipeline.md#deploying-a-pipeline-)
and [Executing pipeline and watching execution results](../../../../kfp/doc/simple_transform_pipeline.md#executing-pipeline-and-watching-execution-results-)
217 changes: 217 additions & 0 deletions transforms/code/header_cleanser/kfp_ray/header_cleanser_wf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import os

import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils


# the name of the job script
EXEC_SCRIPT_NAME: str = "header_cleanser_transform_ray.py"
PREFIX: str = ""

task_image = "quay.io/dataprep1/data-prep-kit/header-cleanser-ray:0.2.1.dev0"

# components
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.1.dev0"

# path to kfp component specifications files
component_spec_path = "../../../../kfp/kfp_ray_components/"

# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
def compute_exec_params_func(
worker_options: str,
actor_options: str,
data_s3_config: str,
data_max_files: int,
data_num_samples: int,
runtime_pipeline_id: str,
runtime_job_id: str,
runtime_code_location: str,
header_cleanser_contents_column_name: str,
header_cleanser_license: bool,
header_cleanser_copyright: bool,

) -> dict:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing specific parameters

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

from runtime_utils import KFPUtils

return {
"data_s3_config": data_s3_config,
"data_max_files": data_max_files,
"data_num_samples": data_num_samples,
"runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options),
"runtime_worker_options": actor_options,
"runtime_pipeline_id": runtime_pipeline_id,
"runtime_job_id": runtime_job_id,
"runtime_code_location": runtime_code_location,
"header_cleanser_contents_column_name": header_cleanser_contents_column_name,
"header_cleanser_license": header_cleanser_license,
"header_cleanser_copyright":header_cleanser_copyright,
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing specific parameters

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated



# KFPv1 and KFP2 uses different methods to create a component from a function. KFPv1 uses the
# `create_component_from_func` function, but it is deprecated by KFPv2 and so has a different import path.
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use
# this if/else statement and explicitly call the decorator.
if os.getenv("KFPv2", "0") == "1":
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at
# compilation time.
import uuid

compute_exec_params_op = dsl.component_decorator.component(
func=compute_exec_params_func, base_image=base_kfp_image
)
print(
"WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
+ "same version of the same pipeline !!!"
)
run_id = uuid.uuid4().hex
else:
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image)
run_id = dsl.RUN_ID_PLACEHOLDER


# create Ray cluster
create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml")
# execute job
execute_ray_jobs_op = comp.load_component_from_file(component_spec_path + "executeRayJobComponent.yaml")
# clean up Ray
cleanup_ray_op = comp.load_component_from_file(component_spec_path + "deleteRayClusterComponent.yaml")

# Task name is part of the pipeline name, the ray cluster name and the job name in DMF.
TASK_NAME: str = "header_cleanser"


# Pipeline to invoke execution on remote resource
@dsl.pipeline(
name=TASK_NAME + "-ray-pipeline",
description="Pipeline for header cleaning task",
)
def header_cleanser(
# Ray cluster
ray_name: str = "header_cleanser-kfp-ray", # name of Ray cluster
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: str = '{"cpu": 4, "memory": 4, "image": "' + task_image + '" }',
ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, \
"image": "'
+ task_image
+ '" }',
server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
# data access
data_s3_config: str = "{'input_folder': 'test/header_cleanser/input/', 'output_folder': 'test/header_cleanser/output/'}",
data_s3_access_secret: str = "s3-secret",
data_max_files: int = -1,
data_num_samples: int = -1,
# orchestrator
runtime_actor_options: str = "{'num_cpus': 0.8}",
runtime_pipeline_id: str = "runtime_pipeline_id",
runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}",
# header cleanser parameters
header_cleanser_contents_column_name: str = "contents",
header_cleanser_license: bool = True,
header_cleanser_copyright: bool = True,
# additional parameters
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the parameters above should have prefix. Also you can define license and copyright as boolean

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 800, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
):
"""
Pipeline to execute Header Cleanser transform
:param ray_name: name of the Ray cluster
:param ray_head_options: head node options, containing the following:
cpu - number of cpus
memory - memory
image - image to use
image_pull_secret - image pull secret
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
min_replicas - min number of replicas
cpu - number of cpus
memory - memory
image - image to use
image_pull_secret - image pull secret
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
wait_cluster_ready_tmout - time to wait for cluster ready, sec
wait_cluster_up_tmout - time to wait for cluster up, sec
wait_job_ready_tmout - time to wait for job ready, sec
wait_print_tmout - time between prints, sec
http_retries - http retries for API server calls
:param data_s3_access_secret - s3 access secret
:param data_s3_config - s3 configuration
:param data_max_files - max files to process
:param data_num_samples - num samples to process
:param runtime_actor_options - actor options
:param runtime_pipeline_id - pipeline id
:param contents_column_name - Name of the column holds the data to process
:param license - Hold value true or false to delete/remove license or not.
:param copyright - Hold value true or false to delete/remove copyright or not.
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
compute_exec_params = compute_exec_params_op(
worker_options=ray_worker_options,
actor_options=runtime_actor_options,
data_s3_config=data_s3_config,
data_max_files=data_max_files,
data_num_samples=data_num_samples,
runtime_pipeline_id=runtime_pipeline_id,
runtime_job_id=run_id,
runtime_code_location=runtime_code_location,
header_cleanser_contents_column_name=header_cleanser_contents_column_name,
header_cleanser_license=header_cleanser_license,
header_cleanser_copyright=header_cleanser_copyright,
)

ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2)
# start Ray cluster
ray_cluster = create_ray_op(
ray_name=ray_name,
run_id=run_id,
ray_head_options=ray_head_options,
ray_worker_options=ray_worker_options,
server_url=server_url,
additional_params=additional_params,
)
ComponentUtils.add_settings_to_component(ray_cluster, ONE_HOUR_SEC * 2)
ray_cluster.after(compute_exec_params)

# Execute job
execute_job = execute_ray_jobs_op(
ray_name=ray_name,
run_id=run_id,
additional_params=additional_params,
# note that the parameters below are specific for NOOP transform
exec_params=compute_exec_params.output,
exec_script_name=EXEC_SCRIPT_NAME,
server_url=server_url,
)
ComponentUtils.add_settings_to_component(execute_job, ONE_WEEK_SEC)
ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret)

execute_job.after(ray_cluster)


if __name__ == "__main__":
# Compiling the pipeline
compiler.Compiler().compile(header_cleanser, __file__.replace(".py", ".yaml"))
1 change: 1 addition & 0 deletions transforms/code/header_cleanser/python/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
venv/
Loading