Skip to content

Commit

Permalink
Add an operator e2e exclusion tests when a single log process has hig…
Browse files Browse the repository at this point in the history
…h latency (#2138)

* Add an operator e2e exclusion tests when a single log process has high latency
  • Loading branch information
johscheuer authored Oct 7, 2024
1 parent c67e073 commit 0dff9ad
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 87 deletions.
30 changes: 30 additions & 0 deletions e2e/fixtures/chaos_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,33 @@ func (factory *Factory) InjectPartitionOnSomeTargetPods(
Selector: target,
}, chaosmesh.Both, nil)
}

// InjectNetworkLatency injects network latency between the source and the target.
func (factory *Factory) InjectNetworkLatency(source chaosmesh.PodSelectorSpec, target chaosmesh.PodSelectorSpec, direction chaosmesh.Direction, delay *chaosmesh.DelaySpec) *ChaosMeshExperiment {
ensurePodPhaseSelectorIsSet(&source)
ensurePodPhaseSelectorIsSet(&target)

return factory.CreateExperiment(&chaosmesh.NetworkChaos{
ObjectMeta: metav1.ObjectMeta{
Name: factory.RandStringRunes(32),
Namespace: factory.GetChaosNamespace(),
Labels: factory.GetDefaultLabels(),
},
Spec: chaosmesh.NetworkChaosSpec{
Action: chaosmesh.DelayAction,
Duration: pointer.String(ChaosDurationForever),
PodSelector: chaosmesh.PodSelector{
Selector: source,
Mode: chaosmesh.AllMode,
},
Target: &chaosmesh.PodSelector{
Mode: chaosmesh.AllMode,
Selector: target,
},
Direction: direction,
TcParameter: chaosmesh.TcParameter{
Delay: delay,
},
},
})
}
88 changes: 1 addition & 87 deletions e2e/test_operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ var _ = BeforeSuite(func() {
operatorPod := factory.RandomPickOnePod(factory.GetOperatorPods(fdbCluster.Namespace()).Items)
Expect(factory.GetLogsForPod(&operatorPod, "manager", nil)).NotTo(BeEmpty())

//Load some data async into the cluster. We will only block as long as the Job is created.
// Load some data async into the cluster. We will only block as long as the Job is created.
factory.CreateDataLoaderIfAbsent(fdbCluster)

// In order to test the robustness of the operator we try to kill the operator Pods every minute.
Expand Down Expand Up @@ -377,92 +377,6 @@ var _ = Describe("Operator", Label("e2e", "pr"), func() {
// fields that we expect are actually set.
})

PWhen("replacing log Pod with high queue", func() {
var experiment *fixtures.ChaosMeshExperiment

BeforeEach(func() {
spec := fdbCluster.GetCluster().Spec.DeepCopy()
spec.AutomationOptions.UseLocalitiesForExclusion = pointer.Bool(true)
fdbCluster.UpdateClusterSpecWithSpec(spec)
Expect(fdbCluster.GetCluster().UseLocalitiesForExclusion()).To(BeTrue())

// Until the race condition is resolved in the FDB go bindings make sure the operator is not restarted.
// See: https://github.com/apple/foundationdb/issues/11222
// We can remove this once 7.1 is the default version.
factory.DeleteChaosMeshExperimentSafe(scheduleInjectPodKill)
status := fdbCluster.GetStatus()

var processGroupID fdbv1beta2.ProcessGroupID
for _, process := range status.Cluster.Processes {
var isLog bool
for _, role := range process.Roles {
if role.Role == "log" {
isLog = true
break
}
}

if !isLog {
continue
}

processGroupID = fdbv1beta2.ProcessGroupID(process.Locality[fdbv1beta2.FDBLocalityInstanceIDKey])
break
}

var replacedPod corev1.Pod
for _, pod := range fdbCluster.GetLogPods().Items {
if fixtures.GetProcessGroupID(pod) != processGroupID {
continue
}

replacedPod = pod
break
}

log.Println("Inject chaos")
experiment = factory.InjectPodStress(fixtures.PodSelector(&replacedPod), []string{fdbv1beta2.MainContainerName}, nil, &chaosmesh.CPUStressor{
Stressor: chaosmesh.Stressor{
Workers: 1,
},
Load: pointer.Int(80),
})

factory.CreateDataLoaderIfAbsent(fdbCluster)

time.Sleep(1 * time.Minute)
log.Println("replacedPod", replacedPod.Name, "useLocalitiesForExclusion", fdbCluster.GetCluster().UseLocalitiesForExclusion())
fdbCluster.ReplacePod(replacedPod, true)
})

It("should exclude the server", func() {
Eventually(func() []fdbv1beta2.ExcludedServers {
status := fdbCluster.GetStatus()
excludedServers := status.Cluster.DatabaseConfiguration.ExcludedServers
log.Println("excludedServers", excludedServers)
return excludedServers
}).WithTimeout(15 * time.Minute).WithPolling(1 * time.Second).Should(BeEmpty())
})

AfterEach(func() {
Expect(fdbCluster.ClearProcessGroupsToRemove()).NotTo(HaveOccurred())
factory.DeleteChaosMeshExperimentSafe(experiment)
// Making sure we included back all the process groups after exclusion is complete.
Expect(fdbCluster.GetStatus().Cluster.DatabaseConfiguration.ExcludedServers).To(BeEmpty())

if factory.ChaosTestsEnabled() {
scheduleInjectPodKill = factory.ScheduleInjectPodKillWithName(
fixtures.GetOperatorSelector(fdbCluster.Namespace()),
"*/2 * * * *",
chaosmesh.OneMode,
fdbCluster.Namespace()+"-"+fdbCluster.Name(),
)
}

factory.DeleteDataLoader(fdbCluster)
})
})

When("replacing a coordinator Pod", func() {
var replacedPod corev1.Pod
var useLocalitiesForExclusion bool
Expand Down
103 changes: 103 additions & 0 deletions e2e/test_operator_ha/operator_ha_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"strconv"
"time"

"k8s.io/utils/pointer"

fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2"
"github.com/FoundationDB/fdb-kubernetes-operator/e2e/fixtures"
"github.com/FoundationDB/fdb-kubernetes-operator/pkg/fdbstatus"
Expand Down Expand Up @@ -211,4 +213,105 @@ var _ = Describe("Operator HA tests", Label("e2e", "pr"), func() {
}).WithTimeout(10 * time.Minute).WithPolling(2 * time.Second).Should(BeNumerically(">=", desiredRunningPods))
})
})

When("locality based exclusions are enabled", func() {
var initialUseLocalitiesForExclusion bool

BeforeEach(func() {
spec := fdbCluster.GetRemote().GetCluster().Spec.DeepCopy()
initialUseLocalitiesForExclusion = fdbCluster.GetRemote().GetCluster().UseLocalitiesForExclusion()
spec.AutomationOptions.UseLocalitiesForExclusion = pointer.Bool(true)
fdbCluster.GetRemote().UpdateClusterSpecWithSpec(spec)
Expect(fdbCluster.GetRemote().GetCluster().UseLocalitiesForExclusion()).To(BeTrue())
})

AfterEach(func() {
spec := fdbCluster.GetRemote().GetCluster().Spec.DeepCopy()
spec.AutomationOptions.UseLocalitiesForExclusion = pointer.Bool(initialUseLocalitiesForExclusion)
fdbCluster.GetRemote().UpdateClusterSpecWithSpec(spec)
})

When("when a remote log has network latency issues and gets replaced", func() {
var experiment *fixtures.ChaosMeshExperiment

BeforeEach(func() {
dcID := fdbCluster.GetRemote().GetCluster().Spec.DataCenter

status := fdbCluster.GetPrimary().GetStatus()

var processGroupID fdbv1beta2.ProcessGroupID
for _, process := range status.Cluster.Processes {
dc, ok := process.Locality[fdbv1beta2.FDBLocalityDCIDKey]
if !ok || dc != dcID {
continue
}

var isLog bool
for _, role := range process.Roles {
if role.Role == "log" {
isLog = true
break
}
}

if !isLog {
continue
}

processGroupID = fdbv1beta2.ProcessGroupID(process.Locality[fdbv1beta2.FDBLocalityInstanceIDKey])
break
}

log.Println("Will inject chaos into", processGroupID, "and replace it")
var replacedPod corev1.Pod
for _, pod := range fdbCluster.GetRemote().GetLogPods().Items {
if fixtures.GetProcessGroupID(pod) != processGroupID {
continue
}

replacedPod = pod
break
}

log.Println("Inject latency chaos")
experiment = factory.InjectNetworkLatency(
fixtures.PodSelector(&replacedPod),
chaosmesh.PodSelectorSpec{
GenericSelectorSpec: chaosmesh.GenericSelectorSpec{
Namespaces: []string{fdbCluster.GetRemote().Namespace()},
LabelSelectors: fdbCluster.GetRemote().GetCachedCluster().GetMatchLabels(),
},
}, chaosmesh.Both,
&chaosmesh.DelaySpec{
Latency: "250ms",
Correlation: "100",
Jitter: "0",
})

// TODO (johscheuer): Allow to have this as a long running task until the test is done.
factory.CreateDataLoaderIfAbsentWithWait(fdbCluster.GetPrimary(), false)

time.Sleep(1 * time.Minute)
log.Println("replacedPod", replacedPod.Name, "useLocalitiesForExclusion", fdbCluster.GetPrimary().GetCluster().UseLocalitiesForExclusion())
fdbCluster.GetRemote().ReplacePod(replacedPod, true)
})

It("should exclude and remove the pod", func() {
Eventually(func() []fdbv1beta2.ExcludedServers {
status := fdbCluster.GetPrimary().GetStatus()
excludedServers := status.Cluster.DatabaseConfiguration.ExcludedServers
log.Println("excludedServers", excludedServers)
return excludedServers
}).WithTimeout(15 * time.Minute).WithPolling(1 * time.Second).Should(BeEmpty())
})

AfterEach(func() {
Expect(fdbCluster.GetRemote().ClearProcessGroupsToRemove()).NotTo(HaveOccurred())
factory.DeleteChaosMeshExperimentSafe(experiment)
// Making sure we included back all the process groups after exclusion is complete.
Expect(fdbCluster.GetPrimary().GetStatus().Cluster.DatabaseConfiguration.ExcludedServers).To(BeEmpty())
factory.DeleteDataLoader(fdbCluster.GetPrimary())
})
})
})
})

0 comments on commit 0dff9ad

Please sign in to comment.