Skip to content

Commit

Permalink
move dataflow affinity logic to fluidapp controller (#4138)
Browse files Browse the repository at this point in the history
* use fluidapp to support data flow affinity

Signed-off-by: xliuqq <[email protected]>

* make fluidapp dataop controller optional

Signed-off-by: xliuqq <[email protected]>

* fix check

Signed-off-by: xliuqq <[email protected]>

* add unit test for injectPodNodeLabelsToJob

Signed-off-by: xliuqq <[email protected]>

* fix: use library.fluid.labels instead of fluid.io/managed-by

Signed-off-by: xliuqq <[email protected]>

* fix comment error and reslove conflicts

Signed-off-by: xliuqq <[email protected]>

* fix annotation nil error

Signed-off-by: xliuqq <[email protected]>

* fix e2e test, serverless pod use serverless.fluid.io/inject and fluid.io/managed-by two labels

Signed-off-by: xliuqq <[email protected]>

---------

Signed-off-by: xliuqq <[email protected]>
  • Loading branch information
xliuqq committed Jun 24, 2024
1 parent d42a2e5 commit 620babc
Show file tree
Hide file tree
Showing 59 changed files with 1,193 additions and 506 deletions.
9 changes: 0 additions & 9 deletions api/v1alpha1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,15 +246,6 @@ type Condition struct {
LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
}

type OperationType string

const (
DataLoadType OperationType = "DataLoad"
DataBackupType OperationType = "DataBackup"
DataMigrateType OperationType = "DataMigrate"
DataProcessType OperationType = "DataProcess"
)

// AffinityPolicy the strategy for the affinity between Data Operation Pods.
type AffinityPolicy string

Expand Down
1 change: 1 addition & 0 deletions charts/fluid-dataloader/alluxio/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ spec:
role: dataload-pod
app: alluxio
targetDataset: {{ required "targetDataset should be set" .Values.dataloader.targetDataset }}
{{- include "library.fluid.labels" . | nindent 8 }}
{{- if .Values.dataloader.labels }}
{{- range $key, $val := .Values.dataloader.labels }}
{{ $key | quote }}: {{ $val | quote }}
Expand Down
1 change: 1 addition & 0 deletions charts/fluid-dataloader/goosefs/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ spec:
role: dataload-pod
app: goosefs
targetDataset: {{ required "targetDataset should be set" .Values.dataloader.targetDataset }}
{{- include "library.fluid.labels" . | nindent 8 }}
spec:
{{- if .Values.dataloader.schedulerName }}
schedulerName: {{ .Values.dataloader.schedulerName }}
Expand Down
1 change: 1 addition & 0 deletions charts/fluid-dataloader/jindo/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ spec:
role: dataload-pod
app: jindofs
targetDataset: {{ required "targetDataset should be set" .Values.dataloader.targetDataset }}
{{- include "library.fluid.labels" . | nindent 8 }}
spec:
{{- if .Values.dataloader.schedulerName }}
schedulerName: {{ .Values.dataloader.schedulerName }}
Expand Down
1 change: 1 addition & 0 deletions charts/fluid-dataloader/jindocache/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ spec:
role: dataload-pod
app: jindocache
targetDataset: {{ required "targetDataset should be set" .Values.dataloader.targetDataset }}
{{- include "library.fluid.labels" . | nindent 8 }}
{{- if .Values.dataloader.labels }}
{{- range $key, $val := .Values.dataloader.labels }}
{{ $key | quote }}: {{ $val | quote }}
Expand Down
1 change: 1 addition & 0 deletions charts/fluid-dataloader/jindofsx/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ spec:
role: dataload-pod
app: jindofsx
targetDataset: {{ required "targetDataset should be set" .Values.dataloader.targetDataset }}
{{- include "library.fluid.labels" . | nindent 8 }}
{{- if .Values.dataloader.labels }}
{{- range $key, $val := .Values.dataloader.labels }}
{{ $key | quote }}: {{ $val | quote }}
Expand Down
1 change: 1 addition & 0 deletions charts/fluid-dataloader/juicefs/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ spec:
role: dataload-pod
app: juicefs
targetDataset: {{ required "targetDataset should be set" .Values.dataloader.targetDataset }}
{{- include "library.fluid.labels" . | nindent 8 }}
{{- if .Values.dataloader.labels }}
{{- range $key, $val := .Values.dataloader.labels }}
{{ $key | quote }}: {{ $val | quote }}
Expand Down
3 changes: 3 additions & 0 deletions charts/fluid-datamigrate/juicefs/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ metadata:
app: juicefs
targetDataset: {{ required "targetDataset should be set" .Values.datamigrate.targetDataset }}
{{- include "library.fluid.labels" . | nindent 4 }}
# indicates the parallel task number
parallelism: {{ .Values.datamigrate.parallelism | quote }}
ownerReferences:
{{- if .Values.owner.enabled }}
- apiVersion: {{ .Values.owner.apiVersion }}
Expand Down Expand Up @@ -38,6 +40,7 @@ spec:
app: juicefs
targetDataset: {{ required "targetDataset should be set" .Values.datamigrate.targetDataset }}
fluid.io/operation: migrate-{{ .Release.Namespace }}-{{ .Release.Name }}
{{- include "library.fluid.labels" . | nindent 8 }}
{{- if .Values.datamigrate.labels }}
{{- range $key, $val := .Values.datamigrate.labels }}
{{ $key | quote }}: {{ $val | quote }}
Expand Down
1 change: 1 addition & 0 deletions charts/fluid-dataprocess/common/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ spec:
role: dataprocess-pod
app: fluid-dataprocess
targetDataset: {{ required "targetDataset should be set" .Values.dataProcess.targetDataset }}
{{- include "library.fluid.labels" . | nindent 8 }}
{{- if .Values.dataProcess.labels }}
{{ toYaml .Values.dataProcess.labels | nindent 8 }}
{{- end }}
Expand Down
2 changes: 1 addition & 1 deletion charts/fluid/fluid/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ Create the name of the service account to use
Check if feature gate DataflowAffinity is enabled in the featureGates.
*/}}
{{- define "fluid.dataflowAffinity.enabled" -}}
{{- $featureGates := splitList "," .Values.dataset.featureGates }}
{{- $featureGates := splitList "," .Values.fluidapp.featureGates }}
{{- $found := false -}}
{{- range $idx, $featureGate := $featureGates }}
{{- $featureGateKV := splitList "=" $featureGate }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ spec:
- --pprof-addr=:6060
- --enable-leader-election
- --leader-election-namespace={{ include "fluid.namespace" . }}
- --feature-gates={{ .Values.dataset.featureGates }}
env:
{{- if .Values.workdir }}
- name: FLUID_WORKDIR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ spec:
- --pprof-addr=:6060
- --enable-leader-election
- --leader-election-namespace={{ include "fluid.namespace" . }}
- --feature-gates={{ .Values.fluidapp.featureGates }}
env:
{{- if .Values.workdir }}
- name: FLUID_WORKDIR
Expand Down
10 changes: 0 additions & 10 deletions charts/fluid/fluid/templates/role/dataset/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,6 @@ rules:
- get
- list
- watch
{{- if eq (include "fluid.dataflowAffinity.enabled" . ) "true" }}
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- list
- watch
{{- end }}
- apiGroups:
- ""
resources:
Expand Down
19 changes: 19 additions & 0 deletions charts/fluid/fluid/templates/role/fluidapp/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,25 @@ rules:
- get
- watch
- create
{{- if eq (include "fluid.dataflowAffinity.enabled" . ) "true" }}
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- list
- watch
- apiGroups:
- batch
resources:
- jobs
verbs:
- get
- list
- watch
- update
{{- end }}
- apiGroups:
- ""
resources:
Expand Down
3 changes: 2 additions & 1 deletion charts/fluid/fluid/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ dataset:
- operator: Exists
controller:
image: fluidcloudnative/dataset-controller:v1.0.1-b33c500
featureGates: "DataflowAffinity=false"

csi:
tolerations:
Expand Down Expand Up @@ -204,10 +203,12 @@ webhook:
required:
- fluid.io/node
fluidapp:
enabled: true
replicas: 1
tolerations:
- operator: Exists
controller:
image: fluidcloudnative/application-controller:v1.0.1-b33c500
featureGates: "DataflowAffinity=false"
3 changes: 0 additions & 3 deletions cmd/dataset/app/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package app

import (
utilfeature "github.com/fluid-cloudnative/fluid/pkg/utils/feature"
"os"
"time"

Expand Down Expand Up @@ -101,8 +100,6 @@ func init() {
datasetCmd.Flags().StringVar(&controllerWorkqueueMaxSyncBackoffStr, "workqueue-max-sync-backoff", "1000s", "max backoff period for failed reconciliation in controller's workqueue")
datasetCmd.Flags().IntVar(&controllerWorkqueueQPS, "workqueue-qps", 10, "qps limit value for controller's workqueue")
datasetCmd.Flags().IntVar(&controllerWorkqueueBurst, "workqueue-burst", 100, "burst limit value for controller's workqueue")

utilfeature.DefaultMutableFeatureGate.AddFlag(datasetCmd.Flags())
}

func handle() {
Expand Down
47 changes: 45 additions & 2 deletions cmd/fluidapp/app/fluidapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@
package app

import (
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/controllers/v1alpha1/fluidapp/dataflowaffinity"
"github.com/fluid-cloudnative/fluid/pkg/dataflow"
utilfeature "github.com/fluid-cloudnative/fluid/pkg/utils/feature"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/labels"
"os"
"sigs.k8s.io/controller-runtime/pkg/cache"

"github.com/fluid-cloudnative/fluid"
"github.com/fluid-cloudnative/fluid/pkg/controllers/v1alpha1/fluidapp"
Expand Down Expand Up @@ -62,6 +69,8 @@ func init() {
fluidAppCmd.Flags().BoolVarP(&development, "development", "", true, "Enable development mode for fluid controller.")
fluidAppCmd.Flags().StringVarP(&pprofAddr, "pprof-addr", "", "", "The address for pprof to use while exporting profiling results")
fluidAppCmd.Flags().IntVar(&maxConcurrentReconciles, "runtime-workers", 3, "Set max concurrent workers for Fluid App controller")

utilfeature.DefaultMutableFeatureGate.AddFlag(fluidAppCmd.Flags())
}

func handle() {
Expand Down Expand Up @@ -89,7 +98,7 @@ func handle() {
LeaderElectionNamespace: leaderElectionNamespace,
LeaderElectionID: "fluidapp.data.fluid.io",
Port: 9443,
NewCache: fluidapp.NewCache(scheme),
NewCache: NewCache(scheme),
})
if err != nil {
setupLog.Error(err, "unable to start fluid app manager")
Expand All @@ -98,7 +107,6 @@ func handle() {

controllerOptions := controller.Options{
MaxConcurrentReconciles: maxConcurrentReconciles,
// Log: ctrl.Log.WithName("appctrl"),
}
if err = (fluidapp.NewFluidAppReconciler(
mgr.GetClient(),
Expand All @@ -109,9 +117,44 @@ func handle() {
os.Exit(1)
}

if dataflow.Enabled(dataflow.DataflowAffinity) {
if err = (dataflowaffinity.NewDataOpJobReconciler(
mgr.GetClient(),
ctrl.Log.WithName("dataopctrl"),
mgr.GetEventRecorderFor("DataOpJob"),
)).SetupWithManager(mgr, controllerOptions); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DataOpJob")
os.Exit(1)
}
}

setupLog.Info("starting fluidapp-controller")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running fluidapp-controller")
os.Exit(1)
}
}

func NewCache(scheme *runtime.Scheme) cache.NewCacheFunc {
options := cache.Options{
Scheme: scheme,
SelectorsByObject: cache.SelectorsByObject{
&corev1.Pod{}: {
Label: labels.SelectorFromSet(labels.Set{
// watch pods managed by fluid, like data operation pods, serverless app pods.
common.LabelAnnotationManagedBy: common.Fluid,
}),
},
},
}
if dataflow.Enabled(dataflow.DataflowAffinity) {
options.SelectorsByObject[&batchv1.Job{}] = cache.ObjectSelector{
// watch data operation job
Label: labels.SelectorFromSet(labels.Set{
// only data operations create job resource and the jobs created by cronjob do not have this label.
common.LabelAnnotationManagedBy: common.Fluid,
}),
}
}
return cache.BuilderWithOptions(options)
}
3 changes: 2 additions & 1 deletion docs/en/samples/application_controller.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jfsdemo [Calculating] N/A N/A Bound

**Create Job**

To use Fluid in a serverless scenario, you need to add the `serverless.fluid.io/inject: "true"` label to the application pod. as follows:
To use Fluid in a serverless scenario, you need to add the `serverless.fluid.io/inject: "true"` and `fluid.io/managed-by: fluid` label to the application pod. as follows:

```yaml
$ cat<<EOF >sample.yaml
Expand All @@ -58,6 +58,7 @@ spec:
metadata:
labels:
serverless.fluid.io/inject: "true"
fluid.io/managed-by: fluid
spec:
containers:
- name: demo
Expand Down
3 changes: 2 additions & 1 deletion docs/zh/samples/application_controller.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jfsdemo [Calculating] N/A N/A Bound

**创建 Job 资源对象**

在 Serverless 场景使用 Fluid,需要在应用 Pod 中添加 `serverless.fluid.io/inject: "true"` label。如下:
在 Serverless 场景使用 Fluid,需要在应用 Pod 中添加 `serverless.fluid.io/inject: "true"``fluid.io/managed-by: fluid` label。如下:

```yaml
$ cat<<EOF >sample.yaml
Expand All @@ -52,6 +52,7 @@ spec:
metadata:
labels:
serverless.fluid.io/inject: "true"
fluid.io/managed-by: fluid
spec:
containers:
- name: demo
Expand Down
7 changes: 6 additions & 1 deletion pkg/common/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (
// LabelAnnotationManagedByDeprecated is a deprecated label key for LabelAnnotationManagedBy
LabelAnnotationManagedByDeprecated = LabelAnnotationPrefix + "wrapped-by"

// LabelAnnotationManagedBy indicates a pvc that is managed by Fluid
// LabelAnnotationManagedBy indicates a resource(like pvc) that is managed by Fluid
LabelAnnotationManagedBy = LabelAnnotationPrefix + "managed-by"

// fluid adminssion webhook inject flag
Expand All @@ -48,6 +48,11 @@ const (

// LabelNodePublishMothod is a pv label that indicates the method nodePuhlishVolume use
LabelNodePublishMothod = LabelAnnotationPrefix + "node-publish-method"

// AnnotationDataFlowAffinityInject is an annotation representing enabled the dataflow affinity injection
AnnotationDataFlowAffinityInject = LabelAnnotationPrefix + "dataflow-affinity.inject"
// LabelDataFlowAffinityPrefix is a prefix for customized dataflow affinity label name.
LabelDataFlowAffinityPrefix = "fluid.io."
)

var (
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/v1alpha1/databackup/implement.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ func (r *dataBackupOperation) SetTargetDatasetStatusInProgress(dataset *datav1al
func (r *dataBackupOperation) RemoveTargetDatasetStatusInProgress(dataset *datav1alpha1.Dataset) {
}

func (r *dataBackupOperation) GetOperationType() datav1alpha1.OperationType {
return datav1alpha1.DataBackupType
func (r *dataBackupOperation) GetOperationType() dataoperation.OperationType {
return dataoperation.DataBackupType
}

func (r *dataBackupOperation) GetTargetDataset() (*datav1alpha1.Dataset, error) {
Expand Down
11 changes: 1 addition & 10 deletions pkg/controllers/v1alpha1/databackup/status_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package databackup

import (
"fmt"
"github.com/fluid-cloudnative/fluid/pkg/dataflow"
"time"

"github.com/fluid-cloudnative/fluid/api/v1alpha1"
Expand Down Expand Up @@ -54,14 +52,7 @@ func (o *OnceHandler) GetOperationStatus(ctx runtime.ReconcileRequestContext, op
return
}

// set the node labels in status when job finished
if dataflow.Enabled(dataflow.DataflowAffinity) && result.NodeAffinity == nil {
// generate the node labels
result.NodeAffinity, err = dataflow.GenerateNodeAffinity(ctx.Client, backupPod)
if err != nil {
return nil, fmt.Errorf("error to generate the node labels: %v", err)
}
}
// TODO: inject nodeaffinity like other data operations when using job instead of pod

var finishTime time.Time
if len(backupPod.Status.Conditions) != 0 {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/v1alpha1/dataload/implement.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func (r *dataLoadOperation) GetChartsDirectory() string {
return utils.GetChartsDirectory() + "/" + cdataload.DataloadChart
}

func (r *dataLoadOperation) GetOperationType() datav1alpha1.OperationType {
return datav1alpha1.DataLoadType
func (r *dataLoadOperation) GetOperationType() dataoperation.OperationType {
return dataoperation.DataLoadType
}

func (r *dataLoadOperation) UpdateOperationApiStatus(opStatus *datav1alpha1.OperationStatus) error {
Expand Down
Loading

0 comments on commit 620babc

Please sign in to comment.