Skip to content
This repository has been archived by the owner on May 3, 2022. It is now read-only.

Commit

Permalink
Bugfixes in release controller strategy executor
Browse files Browse the repository at this point in the history
This commit addresses multiple issues identified in strategy executor,
among which:
  * Wrong recepients for patch updates: due to an error in the code some
  patches were applied to a wrong generation of release and target
  objects.
  * Target object spec checkers used to return an incomplete spec if
  only some of the clusters are misbehaving: there was a risk of
  de-scheduling the workload on healthy clusters.

Signed-off-by: Oleg Sidorov <[email protected]>
  • Loading branch information
Oleg Sidorov authored and juliogreff committed Feb 20, 2020
1 parent e53f506 commit 18e1839
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 93 deletions.
30 changes: 16 additions & 14 deletions pkg/controller/release/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,25 @@ func checkCapacity(

clustersNotReadyMap := make(map[string]struct{})
for _, spec := range ct.Spec.Clusters {
t := spec
if spec.Percent != stepCapacity {
t := shipper.ClusterCapacityTarget{
t = shipper.ClusterCapacityTarget{
Name: spec.Name,
Percent: stepCapacity,
TotalReplicaCount: spec.TotalReplicaCount,
}
newSpec.Clusters = append(newSpec.Clusters, t)

clustersNotReadyMap[spec.Name] = struct{}{}
canProceed = false
}
newSpec.Clusters = append(newSpec.Clusters, t)
}

if canProceed {
// We return an empty new spec if cluster spec check went fine
// so far.
newSpec = nil

if ct.Status.ObservedGeneration >= ct.Generation {
canProceed, reason = targetutil.IsReady(ct.Status.Conditions)
} else {
Expand Down Expand Up @@ -69,11 +74,7 @@ func checkCapacity(
reason = fmt.Sprintf("%v", clustersNotReady)
}

if len(newSpec.Clusters) > 0 {
return canProceed, newSpec, reason
} else {
return canProceed, nil, reason
}
return canProceed, newSpec, reason
}

func checkTraffic(
Expand All @@ -90,19 +91,24 @@ func checkTraffic(

clustersNotReadyMap := make(map[string]struct{})
for _, spec := range tt.Spec.Clusters {
t := spec
if spec.Weight != stepTrafficWeight {
t := shipper.ClusterTrafficTarget{
t = shipper.ClusterTrafficTarget{
Name: spec.Name,
Weight: stepTrafficWeight,
}
newSpec.Clusters = append(newSpec.Clusters, t)

clustersNotReadyMap[spec.Name] = struct{}{}
canProceed = false
}
newSpec.Clusters = append(newSpec.Clusters, t)
}

if canProceed {
// We return an empty new spec if cluster spec check went fine
// so far.
newSpec = nil

if tt.Status.ObservedGeneration >= tt.Generation {
canProceed, reason = targetutil.IsReady(tt.Status.Conditions)
} else {
Expand Down Expand Up @@ -132,9 +138,5 @@ func checkTraffic(
reason = fmt.Sprintf("%v", clustersNotReady)
}

if len(newSpec.Clusters) > 0 {
return canProceed, newSpec, reason
} else {
return canProceed, nil, reason
}
return canProceed, newSpec, reason
}
3 changes: 1 addition & 2 deletions pkg/controller/release/release_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"fmt"
"time"

"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/labels"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -476,7 +476,6 @@ func (c *Controller) executeReleaseStrategy(relinfo *releaseInfo, diff *diffutil
} else {
achievedStep = int32(len(rel.Spec.Environment.Strategy.Steps)) - 1
achievedStepName = rel.Spec.Environment.Strategy.Steps[achievedStep].Name

}
if prevStep == nil || achievedStep != prevStep.Step {
rel.Status.AchievedStep = &shipper.AchievedStep{
Expand Down
210 changes: 191 additions & 19 deletions pkg/controller/release/release_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1242,7 +1242,8 @@ func (f *fixture) expectCapacityNotReady(relpair releaseInfoPair, targetStep, ac
// }
// }

var rel *shipper.Release
rel := relpair.contender.release

if role == Contender {
newStatus = map[string]interface{}{
"status": shipper.ReleaseStatus{
Expand Down Expand Up @@ -1275,8 +1276,6 @@ func (f *fixture) expectCapacityNotReady(relpair releaseInfoPair, targetStep, ac
},
},
}
rel = relpair.contender.release

} else {
newStatus = map[string]interface{}{
"status": shipper.ReleaseStatus{
Expand Down Expand Up @@ -1324,8 +1323,6 @@ func (f *fixture) expectCapacityNotReady(relpair releaseInfoPair, targetStep, ac
},
},
}

rel = relpair.incumbent.release
}

patch, _ := json.Marshal(newStatus)
Expand All @@ -1350,7 +1347,8 @@ func (f *fixture) expectTrafficNotReady(relpair releaseInfoPair, targetStep, ach
// }
// }

var rel *shipper.Release
rel := relpair.contender.release

if role == Contender {
newStatus = map[string]interface{}{
"status": shipper.ReleaseStatus{
Expand Down Expand Up @@ -1388,8 +1386,6 @@ func (f *fixture) expectTrafficNotReady(relpair releaseInfoPair, targetStep, ach
},
},
}

rel = relpair.contender.release
} else {
newStatus = map[string]interface{}{
"status": shipper.ReleaseStatus{
Expand Down Expand Up @@ -1432,8 +1428,6 @@ func (f *fixture) expectTrafficNotReady(relpair releaseInfoPair, targetStep, ach
},
},
}

rel = relpair.incumbent.release
}

patch, _ := json.Marshal(newStatus)
Expand Down Expand Up @@ -1483,15 +1477,15 @@ func TestContenderReleasePhaseIsWaitingForCommandForInitialStepState(t *testing.
incumbent.capacityTarget.Spec.Clusters[0].Percent = 100

f.addObjects(
incumbent.release.DeepCopy(),
incumbent.installationTarget.DeepCopy(),
incumbent.capacityTarget.DeepCopy(),
incumbent.trafficTarget.DeepCopy(),

contender.release.DeepCopy(),
contender.installationTarget.DeepCopy(),
contender.capacityTarget.DeepCopy(),
contender.trafficTarget.DeepCopy(),

incumbent.release.DeepCopy(),
incumbent.installationTarget.DeepCopy(),
incumbent.capacityTarget.DeepCopy(),
incumbent.trafficTarget.DeepCopy(),
)
var step int32 = 0
f.expectReleaseWaitingForCommand(contender.release, step)
Expand Down Expand Up @@ -1950,7 +1944,7 @@ func TestIncumbentTrafficShouldDecrease(t *testing.T) {
)

tt := incumbent.trafficTarget.DeepCopy()
r := incumbent.release.DeepCopy()
r := contender.release.DeepCopy()
f.expectTrafficStatusPatch(contender.release.Spec.TargetStep, tt, r, 50, Incumbent)
f.run()
}
Expand Down Expand Up @@ -1991,7 +1985,7 @@ func TestIncumbentTrafficShouldDecreaseWithRolloutBlockOverride(t *testing.T) {
)

tt := incumbent.trafficTarget.DeepCopy()
r := incumbent.release.DeepCopy()
r := contender.release.DeepCopy()
f.expectTrafficStatusPatch(contender.release.Spec.TargetStep, tt, r, 50, Incumbent)
overrideEvent := fmt.Sprintf("%s RolloutBlockOverridden %s", corev1.EventTypeNormal, rolloutBlockKey)
f.expectedEvents = append([]string{overrideEvent}, f.expectedEvents...)
Expand Down Expand Up @@ -2079,7 +2073,7 @@ func TestIncumbentCapacityShouldDecrease(t *testing.T) {
)

tt := incumbent.capacityTarget.DeepCopy()
r := incumbent.release.DeepCopy()
r := contender.release.DeepCopy()
f.expectCapacityStatusPatch(contender.release.Spec.TargetStep, tt, r, 50, uint(totalReplicaCount), Incumbent)
f.run()
}
Expand Down Expand Up @@ -2122,7 +2116,7 @@ func TestIncumbentCapacityShouldDecreaseWithRolloutBlockOverride(t *testing.T) {
)

tt := incumbent.capacityTarget.DeepCopy()
r := incumbent.release.DeepCopy()
r := contender.release.DeepCopy()
f.expectCapacityStatusPatch(contender.release.Spec.TargetStep, tt, r, 50, uint(totalReplicaCount), Incumbent)
overrideEvent := fmt.Sprintf("%s RolloutBlockOverridden %s", corev1.EventTypeNormal, rolloutBlockKey)
f.expectedEvents = append([]string{overrideEvent}, f.expectedEvents...)
Expand Down Expand Up @@ -2829,3 +2823,181 @@ func TestIncumbentOutOfRangeTargetStep(t *testing.T) {

f.run()
}

func TestUnhealthyTrafficAndCapacityIncumbentConvergesConsistently(t *testing.T) {
namespace := "test-namespace"
incumbentName, contenderName := "test-incumbent", "test-contender"
app := buildApplication(namespace, "test-app")
cluster := buildCluster("minikube")
brokenCluster := buildCluster("broken-cluster")

f := newFixture(t, app.DeepCopy(), cluster.DeepCopy())
replicaCount := int32(4)

contender := f.buildContender(namespace, contenderName, replicaCount)
incumbent := f.buildIncumbent(namespace, incumbentName, replicaCount)

addCluster(incumbent, brokenCluster)

// Mark contender as fully healthy
var step int32 = 2
contender.release.Spec.TargetStep = step
contender.capacityTarget.Spec.Clusters[0].Percent = 100
contender.capacityTarget.Spec.Clusters[0].TotalReplicaCount = replicaCount
contender.trafficTarget.Spec.Clusters[0].Weight = 100
contender.release.Status.AchievedStep = &shipper.AchievedStep{Step: 2}

incumbent.trafficTarget.Spec.Clusters[0].Weight = 0
incumbent.trafficTarget.Spec.Clusters[1].Weight = 0
incumbent.trafficTarget.Status.Clusters = []*shipper.ClusterTrafficStatus{
{
AchievedTraffic: 50,
},
{
AchievedTraffic: 0,
},
}
incumbent.trafficTarget.Status.Conditions, _ = targetutil.SetTargetCondition(
incumbent.trafficTarget.Status.Conditions,
targetutil.NewTargetCondition(
shipper.TargetConditionTypeReady,
corev1.ConditionFalse,
ClustersNotReady, "[broken-cluster]",
),
)
incumbent.capacityTarget.Spec.Clusters[0].Name = "broken-cluster"
incumbent.capacityTarget.Spec.Clusters[0].Percent = 0
incumbent.capacityTarget.Spec.Clusters[0].TotalReplicaCount = replicaCount
incumbent.capacityTarget.Spec.Clusters[1].Name = "minikube"
incumbent.capacityTarget.Spec.Clusters[1].Percent = 0
incumbent.capacityTarget.Spec.Clusters[1].TotalReplicaCount = 0
incumbent.capacityTarget.Status.Conditions, _ = targetutil.SetTargetCondition(
incumbent.capacityTarget.Status.Conditions,
targetutil.NewTargetCondition(
shipper.TargetConditionTypeReady,
corev1.ConditionFalse,
ClustersNotReady, "[broken-cluster]"))
incumbent.capacityTarget.Status.Clusters = []shipper.ClusterCapacityStatus{
{
AvailableReplicas: 42, // anything but spec-matching
AchievedPercent: 42, // anything but spec-matching
},
{
AvailableReplicas: 0,
AchievedPercent: 0,
},
}

expected := contender.release.DeepCopy()
condScheduled := releaseutil.NewReleaseCondition(shipper.ReleaseConditionTypeScheduled, corev1.ConditionTrue, "", "")
releaseutil.SetReleaseCondition(&expected.Status, *condScheduled)
condStrategyExecuted := releaseutil.NewReleaseCondition(shipper.ReleaseConditionTypeStrategyExecuted, corev1.ConditionTrue, "", "")
releaseutil.SetReleaseCondition(&expected.Status, *condStrategyExecuted)

f.addObjects(
contender.release.DeepCopy(),
contender.installationTarget.DeepCopy(),
contender.capacityTarget.DeepCopy(),
contender.trafficTarget.DeepCopy(),

incumbent.release.DeepCopy(),
incumbent.installationTarget.DeepCopy(),
incumbent.capacityTarget.DeepCopy(),
incumbent.trafficTarget.DeepCopy(),
)

f.actions = append(f.actions,
kubetesting.NewUpdateAction(
shipper.SchemeGroupVersion.WithResource("releases"),
contender.release.GetNamespace(),
expected))

var patch []byte

newContenderStatus := map[string]interface{}{
"status": shipper.ReleaseStatus{
Strategy: &shipper.ReleaseStrategyStatus{
State: shipper.ReleaseStrategyState{
WaitingForInstallation: shipper.StrategyStateFalse,
WaitingForCommand: shipper.StrategyStateFalse,
WaitingForTraffic: shipper.StrategyStateTrue,
WaitingForCapacity: shipper.StrategyStateFalse,
},
Conditions: []shipper.ReleaseStrategyCondition{
shipper.ReleaseStrategyCondition{
Type: shipper.StrategyConditionContenderAchievedCapacity,
Status: corev1.ConditionTrue,
Step: step,
},
shipper.ReleaseStrategyCondition{
Type: shipper.StrategyConditionContenderAchievedInstallation,
Status: corev1.ConditionTrue,
Step: step,
},
shipper.ReleaseStrategyCondition{
Type: shipper.StrategyConditionContenderAchievedTraffic,
Status: corev1.ConditionTrue,
Step: step,
},
shipper.ReleaseStrategyCondition{
Type: shipper.StrategyConditionIncumbentAchievedTraffic,
Status: corev1.ConditionFalse,
Step: step,
Reason: ClustersNotReady,
Message: fmt.Sprintf("release \"test-incumbent\" hasn't achieved traffic in clusters: [broken-cluster]. for more details try `kubectl describe tt test-incumbent`"),
},
},
},
},
}
patch, _ = json.Marshal(newContenderStatus)

f.actions = append(f.actions, kubetesting.NewPatchAction(
shipper.SchemeGroupVersion.WithResource("releases"),
contender.release.GetNamespace(),
contender.release.GetName(),
types.MergePatchType,
patch,
))

newIncumbentStatus := map[string]interface{}{
"status": shipper.ReleaseStatus{
Strategy: &shipper.ReleaseStrategyStatus{
State: shipper.ReleaseStrategyState{
WaitingForInstallation: shipper.StrategyStateFalse,
WaitingForCommand: shipper.StrategyStateFalse,
WaitingForTraffic: shipper.StrategyStateFalse,
WaitingForCapacity: shipper.StrategyStateTrue,
},
Conditions: []shipper.ReleaseStrategyCondition{
shipper.ReleaseStrategyCondition{
Type: shipper.StrategyConditionContenderAchievedCapacity,
Status: corev1.ConditionFalse,
Step: step,
Reason: ClustersNotReady,
Message: fmt.Sprintf("release \"test-incumbent\" hasn't achieved capacity in clusters: [broken-cluster]. for more details try `kubectl describe ct test-incumbent`"),
},
shipper.ReleaseStrategyCondition{
Type: shipper.StrategyConditionContenderAchievedInstallation,
Status: corev1.ConditionTrue,
Step: step,
},
},
},
},
}
patch, _ = json.Marshal(newIncumbentStatus)

f.actions = append(f.actions, kubetesting.NewPatchAction(
shipper.SchemeGroupVersion.WithResource("releases"),
incumbent.release.GetNamespace(),
incumbent.release.GetName(),
types.MergePatchType,
patch,
))

f.expectedEvents = append(f.expectedEvents,
`Normal ReleaseConditionChanged [] -> [Scheduled True], [] -> [StrategyExecuted True]`)

f.run()
}
Loading

0 comments on commit 18e1839

Please sign in to comment.