From c8e5665ece9ff97b81489f9f863b148512295f04 Mon Sep 17 00:00:00 2001 From: Johannes Scheuermann Date: Tue, 17 Sep 2024 09:57:13 +0200 Subject: [PATCH] Improve the tester handling when a cluster is upgraded (#2130) * Improve the tester handling when a cluster is upgraded --- controllers/bounce_processes.go | 46 +- controllers/bounce_processes_test.go | 1377 ++++++++++------- e2e/fixtures/fdb_cluster.go | 293 +++- .../operator_ha_upgrade_test.go | 35 + go.mod | 6 +- 5 files changed, 1118 insertions(+), 639 deletions(-) diff --git a/controllers/bounce_processes.go b/controllers/bounce_processes.go index b1613479..634b81fe 100644 --- a/controllers/bounce_processes.go +++ b/controllers/bounce_processes.go @@ -283,6 +283,37 @@ func getProcessesReadyForRestart(logger logr.Logger, cluster *fdbv1beta2.Foundat return addresses, nil } +// getUpgradeAddressesFromStatus will return the processes that can be upgraded and all the processes that are not ready to be upgraded. +func getUpgradeAddressesFromStatus(logger logr.Logger, status *fdbv1beta2.FoundationDBStatus, pendingUpgrades map[fdbv1beta2.ProcessGroupID]bool, version string) ([]fdbv1beta2.ProcessAddress, []string) { + notReadyProcesses := make([]string, 0) + addresses := make([]fdbv1beta2.ProcessAddress, 0, len(status.Cluster.Processes)) + for _, process := range status.Cluster.Processes { + // Ignore any tester processes as those are not restarted with the kill command. + if process.ProcessClass == fdbv1beta2.ProcessClassTest { + continue + } + + processID, ok := process.Locality[fdbv1beta2.FDBLocalityInstanceIDKey] + if !ok { + logger.Info("Ignore process with missing locality field", "address", process.Address.String()) + continue + } + + if process.Version == version { + continue + } + + if pendingUpgrades[fdbv1beta2.ProcessGroupID(processID)] { + addresses = append(addresses, process.Address) + continue + } + + notReadyProcesses = append(notReadyProcesses, processID) + } + + return addresses, notReadyProcesses +} + // getAddressesForUpgrade checks that all processes in a cluster are ready to be // upgraded and returns the full list of addresses. func getAddressesForUpgrade(logger logr.Logger, r *FoundationDBClusterReconciler, status *fdbv1beta2.FoundationDBStatus, lockClient fdbadminclient.LockClient, cluster *fdbv1beta2.FoundationDBCluster, version fdbv1beta2.Version) ([]fdbv1beta2.ProcessAddress, *requeue) { @@ -300,20 +331,7 @@ func getAddressesForUpgrade(logger logr.Logger, r *FoundationDBClusterReconciler return nil, &requeue{message: "Deferring upgrade until database is available"} } - notReadyProcesses := make([]string, 0) - addresses := make([]fdbv1beta2.ProcessAddress, 0, len(status.Cluster.Processes)) - for _, process := range status.Cluster.Processes { - processID := process.Locality[fdbv1beta2.FDBLocalityInstanceIDKey] - if process.Version == version.String() { - continue - } - if pendingUpgrades[fdbv1beta2.ProcessGroupID(processID)] { - addresses = append(addresses, process.Address) - } else { - notReadyProcesses = append(notReadyProcesses, processID) - } - } - + addresses, notReadyProcesses := getUpgradeAddressesFromStatus(logger, status, pendingUpgrades, version.String()) if len(notReadyProcesses) > 0 { logger.Info("Deferring upgrade until all processes are ready to be upgraded", "remainingProcesses", notReadyProcesses) message := fmt.Sprintf("Waiting for processes to be updated: %v", notReadyProcesses) diff --git a/controllers/bounce_processes_test.go b/controllers/bounce_processes_test.go index 6f442226..70374130 100644 --- a/controllers/bounce_processes_test.go +++ b/controllers/bounce_processes_test.go @@ -38,66 +38,48 @@ import ( ) var _ = Describe("bounceProcesses", func() { - var cluster *fdbv1beta2.FoundationDBCluster - var adminClient *mock.AdminClient - var lockClient *mock.LockClient - var requeue *requeue - var err error + When("bouncing processes", func() { + var cluster *fdbv1beta2.FoundationDBCluster + var adminClient *mock.AdminClient + var lockClient *mock.LockClient + var requeue *requeue + var err error - BeforeEach(func() { - cluster = internal.CreateDefaultCluster() - cluster.Spec.LockOptions.DisableLocks = pointer.Bool(false) - Expect(setupClusterForTest(cluster)).NotTo(HaveOccurred()) - - adminClient, err = mock.NewMockAdminClientUncast(cluster, k8sClient) - Expect(err).NotTo(HaveOccurred()) - - lockClient = mock.NewMockLockClientUncast(cluster) - }) - - JustBeforeEach(func() { - requeue = bounceProcesses{}.reconcile(context.TODO(), clusterReconciler, cluster, nil, globalControllerLogger) - }) + BeforeEach(func() { + cluster = internal.CreateDefaultCluster() + cluster.Spec.LockOptions.DisableLocks = pointer.Bool(false) + Expect(setupClusterForTest(cluster)).NotTo(HaveOccurred()) - Context("with a reconciled cluster", func() { - It("should not requeue", func() { + adminClient, err = mock.NewMockAdminClientUncast(cluster, k8sClient) Expect(err).NotTo(HaveOccurred()) - Expect(requeue).To(BeNil()) - }) - It("should not kill any processes", func() { - Expect(adminClient.KilledAddresses).To(BeEmpty()) + lockClient = mock.NewMockLockClientUncast(cluster) }) - }) - When("two processes have the IncorrectCommandLine condition", func() { - var pickedProcessGroups []*fdbv1beta2.ProcessGroupStatus - - BeforeEach(func() { - pickedProcessGroups = internal.PickProcessGroups(cluster, fdbv1beta2.ProcessClassStorage, 2) - - for _, processGroup := range pickedProcessGroups { - processGroup.UpdateCondition(fdbv1beta2.IncorrectCommandLine, true) - } + JustBeforeEach(func() { + requeue = bounceProcesses{}.reconcile(context.TODO(), clusterReconciler, cluster, nil, globalControllerLogger) }) - It("should not requeue", func() { - Expect(requeue).To(BeNil()) - }) + Context("with a reconciled cluster", func() { + It("should not requeue", func() { + Expect(err).NotTo(HaveOccurred()) + Expect(requeue).To(BeNil()) + }) - It("should kill the targeted processes", func() { - addresses := make(map[string]fdbv1beta2.None, 2) - for _, processGroup := range pickedProcessGroups { - for _, address := range processGroup.Addresses { - addresses[fmt.Sprintf("%s:4501", address)] = fdbv1beta2.None{} - } - } - Expect(adminClient.KilledAddresses).To(Equal(addresses)) + It("should not kill any processes", func() { + Expect(adminClient.KilledAddresses).To(BeEmpty()) + }) }) - When("one process is marked for removal", func() { + When("two processes have the IncorrectCommandLine condition", func() { + var pickedProcessGroups []*fdbv1beta2.ProcessGroupStatus + BeforeEach(func() { - pickedProcessGroups[0].MarkForRemoval() + pickedProcessGroups = internal.PickProcessGroups(cluster, fdbv1beta2.ProcessClassStorage, 2) + + for _, processGroup := range pickedProcessGroups { + processGroup.UpdateCondition(fdbv1beta2.IncorrectCommandLine, true) + } }) It("should not requeue", func() { @@ -105,7 +87,7 @@ var _ = Describe("bounceProcesses", func() { }) It("should kill the targeted processes", func() { - addresses := make(map[string]fdbv1beta2.None, 1) + addresses := make(map[string]fdbv1beta2.None, 2) for _, processGroup := range pickedProcessGroups { for _, address := range processGroup.Addresses { addresses[fmt.Sprintf("%s:4501", address)] = fdbv1beta2.None{} @@ -113,129 +95,292 @@ var _ = Describe("bounceProcesses", func() { } Expect(adminClient.KilledAddresses).To(Equal(addresses)) }) + + When("one process is marked for removal", func() { + BeforeEach(func() { + pickedProcessGroups[0].MarkForRemoval() + }) + + It("should not requeue", func() { + Expect(requeue).To(BeNil()) + }) + + It("should kill the targeted processes", func() { + addresses := make(map[string]fdbv1beta2.None, 1) + for _, processGroup := range pickedProcessGroups { + for _, address := range processGroup.Addresses { + addresses[fmt.Sprintf("%s:4501", address)] = fdbv1beta2.None{} + } + } + Expect(adminClient.KilledAddresses).To(Equal(addresses)) + }) + }) + + When("one process was manually excluded", func() { + BeforeEach(func() { + for _, address := range pickedProcessGroups[0].Addresses { + err := adminClient.ExcludeProcesses([]fdbv1beta2.ProcessAddress{{StringAddress: address, Port: 4501}}) + Expect(err).To(BeNil()) + } + }) + + It("should not requeue", func() { + Expect(requeue).To(BeNil()) + }) + + It("should kill the targeted processes", func() { + addresses := make(map[string]fdbv1beta2.None, 1) + for _, processGroup := range pickedProcessGroups { + for _, address := range processGroup.Addresses { + addresses[fmt.Sprintf("%s:4501", address)] = fdbv1beta2.None{} + } + } + Expect(adminClient.KilledAddresses).To(Equal(addresses)) + }) + }) }) - When("one process was manually excluded", func() { + Context("with Pod in pending state", func() { + var pickedProcessGroups []*fdbv1beta2.ProcessGroupStatus + BeforeEach(func() { - for _, address := range pickedProcessGroups[0].Addresses { - err := adminClient.ExcludeProcesses([]fdbv1beta2.ProcessAddress{{StringAddress: address, Port: 4501}}) - Expect(err).To(BeNil()) + pickedProcessGroups = internal.PickProcessGroups(cluster, fdbv1beta2.ProcessClassStorage, 1) + + for _, processGroup := range pickedProcessGroups { + processGroup.UpdateCondition(fdbv1beta2.IncorrectCommandLine, true) + processGroup.UpdateCondition(fdbv1beta2.PodPending, true) } + + cluster.Spec.AutomationOptions.IgnorePendingPodsDuration = 1 * time.Nanosecond }) It("should not requeue", func() { Expect(requeue).To(BeNil()) }) - It("should kill the targeted processes", func() { - addresses := make(map[string]fdbv1beta2.None, 1) - for _, processGroup := range pickedProcessGroups { - for _, address := range processGroup.Addresses { - addresses[fmt.Sprintf("%s:4501", address)] = fdbv1beta2.None{} - } + It("should not kill the pending process", func() { + for _, address := range pickedProcessGroups[0].Addresses { + addr := fmt.Sprintf("%s:4501", address) + Expect(adminClient.KilledAddresses).NotTo(HaveKey(addr)) } - Expect(adminClient.KilledAddresses).To(Equal(addresses)) }) }) - }) - Context("with Pod in pending state", func() { - var pickedProcessGroups []*fdbv1beta2.ProcessGroupStatus + When("a process group has the MissingProcess condition", func() { + var pickedProcessGroups []*fdbv1beta2.ProcessGroupStatus - BeforeEach(func() { - pickedProcessGroups = internal.PickProcessGroups(cluster, fdbv1beta2.ProcessClassStorage, 1) + BeforeEach(func() { + pickedProcessGroups = internal.PickProcessGroups(cluster, fdbv1beta2.ProcessClassStorage, 2) - for _, processGroup := range pickedProcessGroups { - processGroup.UpdateCondition(fdbv1beta2.IncorrectCommandLine, true) - processGroup.UpdateCondition(fdbv1beta2.PodPending, true) - } + for _, processGroup := range pickedProcessGroups { + processGroup.UpdateCondition(fdbv1beta2.IncorrectCommandLine, true) + } - cluster.Spec.AutomationOptions.IgnorePendingPodsDuration = 1 * time.Nanosecond - }) + pickedProcessGroups[0].ProcessGroupConditions = append(pickedProcessGroups[0].ProcessGroupConditions, &fdbv1beta2.ProcessGroupCondition{ + ProcessGroupConditionType: fdbv1beta2.MissingProcesses, + Timestamp: time.Now().Add(-2 * time.Minute).Unix(), + }) + }) - It("should not requeue", func() { - Expect(requeue).To(BeNil()) - }) + It("should not requeue", func() { + Expect(requeue).To(BeNil()) + }) - It("should not kill the pending process", func() { - for _, address := range pickedProcessGroups[0].Addresses { - addr := fmt.Sprintf("%s:4501", address) - Expect(adminClient.KilledAddresses).NotTo(HaveKey(addr)) - } + It("should not kill the missing process but all other processes", func() { + Expect(adminClient.KilledAddresses).To(HaveLen(1)) + for _, address := range pickedProcessGroups[0].Addresses { + addr := fmt.Sprintf("%s:4501", address) + Expect(adminClient.KilledAddresses).NotTo(HaveKey(addr)) + } + }) }) - }) - When("a process group has the MissingProcess condition", func() { - var pickedProcessGroups []*fdbv1beta2.ProcessGroupStatus + When("using multiple storage servers per pod", func() { + var pickedProcessGroups []*fdbv1beta2.ProcessGroupStatus - BeforeEach(func() { - pickedProcessGroups = internal.PickProcessGroups(cluster, fdbv1beta2.ProcessClassStorage, 2) + BeforeEach(func() { + cluster.Spec.StorageServersPerPod = 2 + Expect(k8sClient.Update(context.TODO(), cluster)).NotTo(HaveOccurred()) + result, err := reconcileCluster(cluster) + Expect(err).NotTo(HaveOccurred()) + Expect(result.Requeue).To(BeFalse()) + _, err = reloadCluster(cluster) + Expect(err).NotTo(HaveOccurred()) - for _, processGroup := range pickedProcessGroups { - processGroup.UpdateCondition(fdbv1beta2.IncorrectCommandLine, true) - } + pickedProcessGroups = internal.PickProcessGroups(cluster, fdbv1beta2.ProcessClassStorage, 2) + for _, processGroup := range pickedProcessGroups { + processGroup.UpdateCondition(fdbv1beta2.IncorrectCommandLine, true) + } + }) - pickedProcessGroups[0].ProcessGroupConditions = append(pickedProcessGroups[0].ProcessGroupConditions, &fdbv1beta2.ProcessGroupCondition{ - ProcessGroupConditionType: fdbv1beta2.MissingProcesses, - Timestamp: time.Now().Add(-2 * time.Minute).Unix(), + It("should not requeue", func() { + Expect(requeue).To(BeNil()) }) - }) - It("should not requeue", func() { - Expect(requeue).To(BeNil()) - }) + It("should kill the targeted processes", func() { + addresses := make(map[string]fdbv1beta2.None, 2) + for _, processGroup := range pickedProcessGroups { + for _, address := range processGroup.Addresses { + addresses[fmt.Sprintf("%s:4501", address)] = fdbv1beta2.None{} + addresses[fmt.Sprintf("%s:4503", address)] = fdbv1beta2.None{} + } + } + Expect(adminClient.KilledAddresses).To(Equal(addresses)) + }) - It("should not kill the missing process but all other processes", func() { - Expect(adminClient.KilledAddresses).To(HaveLen(1)) - for _, address := range pickedProcessGroups[0].Addresses { - addr := fmt.Sprintf("%s:4501", address) - Expect(adminClient.KilledAddresses).NotTo(HaveKey(addr)) - } - }) - }) + When("doing an upgrade", func() { + BeforeEach(func() { + cluster.Spec.Version = fdbv1beta2.Versions.NextMajorVersion.String() + for _, processGroup := range cluster.Status.ProcessGroups { + processGroup.UpdateCondition(fdbv1beta2.IncorrectCommandLine, true) + } + }) - When("using multiple storage servers per pod", func() { - var pickedProcessGroups []*fdbv1beta2.ProcessGroupStatus + When("all processes are reporting", func() { + It("should requeue", func() { + Expect(requeue).NotTo(BeNil()) + }) - BeforeEach(func() { - cluster.Spec.StorageServersPerPod = 2 - Expect(k8sClient.Update(context.TODO(), cluster)).NotTo(HaveOccurred()) - result, err := reconcileCluster(cluster) - Expect(err).NotTo(HaveOccurred()) - Expect(result.Requeue).To(BeFalse()) - _, err = reloadCluster(cluster) - Expect(err).NotTo(HaveOccurred()) + It("should kill all the processes", func() { + addresses := make(map[string]fdbv1beta2.None, len(cluster.Status.ProcessGroups)) + for _, processGroup := range cluster.Status.ProcessGroups { + for _, address := range processGroup.Addresses { + addresses[fmt.Sprintf("%s:4501", address)] = fdbv1beta2.None{} + if processGroup.ProcessClass == fdbv1beta2.ProcessClassStorage { + addresses[fmt.Sprintf("%s:4503", address)] = fdbv1beta2.None{} + } + } + } + Expect(adminClient.KilledAddresses).To(Equal(addresses)) + }) - pickedProcessGroups = internal.PickProcessGroups(cluster, fdbv1beta2.ProcessClassStorage, 2) - for _, processGroup := range pickedProcessGroups { - processGroup.UpdateCondition(fdbv1beta2.IncorrectCommandLine, true) - } - }) + It("should update the running version in the status", func() { + _, err = reloadCluster(cluster) + Expect(err).NotTo(HaveOccurred()) + Expect(cluster.Status.RunningVersion).To(Equal(fdbv1beta2.Versions.NextMajorVersion.String())) + }) - It("should not requeue", func() { - Expect(requeue).To(BeNil()) - }) + It("should submit pending upgrade information for all the processes", func() { + expectedUpgrades := make(map[fdbv1beta2.ProcessGroupID]bool, len(cluster.Status.ProcessGroups)) + for _, processGroup := range cluster.Status.ProcessGroups { + expectedUpgrades[processGroup.ProcessGroupID] = true + } + pendingUpgrades, err := lockClient.GetPendingUpgrades(fdbv1beta2.Versions.NextMajorVersion) + Expect(err).NotTo(HaveOccurred()) + Expect(pendingUpgrades).To(Equal(expectedUpgrades)) + }) + }) - It("should kill the targeted processes", func() { - addresses := make(map[string]fdbv1beta2.None, 2) - for _, processGroup := range pickedProcessGroups { - for _, address := range processGroup.Addresses { - addresses[fmt.Sprintf("%s:4501", address)] = fdbv1beta2.None{} - addresses[fmt.Sprintf("%s:4503", address)] = fdbv1beta2.None{} - } - } - Expect(adminClient.KilledAddresses).To(Equal(addresses)) + When("the processes of a pod are missing", func() { + var missingProcessGroup *fdbv1beta2.ProcessGroupStatus + + BeforeEach(func() { + for _, processGroup := range cluster.Status.ProcessGroups { + if processGroup.ProcessClass != fdbv1beta2.ProcessClassStorage { + continue + } + + missingProcessGroup = processGroup + break + } + + Expect(missingProcessGroup.ProcessClass).To(Equal(fdbv1beta2.ProcessClassStorage)) + adminClient.MockMissingProcessGroup(missingProcessGroup.ProcessGroupID, true) + missingProcessGroup.UpdateCondition(fdbv1beta2.MissingProcesses, true) + }) + + When("they are missing for a short time", func() { + BeforeEach(func() { + missingProcessGroup.UpdateCondition(fdbv1beta2.MissingProcesses, true) + }) + + It("should requeue", func() { + Expect(requeue).NotTo(BeNil()) + Expect(requeue.message).To(Equal(fmt.Sprintf("could not find address for processes: %s", []fdbv1beta2.ProcessGroupID{missingProcessGroup.ProcessGroupID}))) + }) + + It("shouldn't kill any processes", func() { + Expect(adminClient.KilledAddresses).To(BeEmpty()) + }) + + It("should not submit pending upgrade information", func() { + pendingUpgrades, err := lockClient.GetPendingUpgrades(fdbv1beta2.Versions.NextMajorVersion) + Expect(err).NotTo(HaveOccurred()) + Expect(pendingUpgrades).To(BeEmpty()) + }) + }) + + When("they are missing for a long time", func() { + BeforeEach(func() { + missingProcessGroup.ProcessGroupConditions = []*fdbv1beta2.ProcessGroupCondition{ + { + ProcessGroupConditionType: fdbv1beta2.MissingProcesses, + Timestamp: time.Now().Add(-5 * time.Minute).Unix(), + }, + } + }) + + It("should requeue", func() { + Expect(requeue).NotTo(BeNil()) + Expect(requeue.message).To(Equal("fetch latest status after upgrade")) + }) + + It("should kill the processes except the missing process", func() { + Expect(adminClient.KilledAddresses).NotTo(BeEmpty()) + Expect(adminClient.KilledAddresses).NotTo(ContainElement(missingProcessGroup.Addresses)) + }) + + It("should not submit pending upgrade information", func() { + pendingUpgrades, err := lockClient.GetPendingUpgrades(fdbv1beta2.Versions.NextMajorVersion) + Expect(err).NotTo(HaveOccurred()) + Expect(pendingUpgrades).NotTo(BeEmpty()) + }) + }) + }) + }) }) - When("doing an upgrade", func() { + Context("with multiple log servers per pod", func() { + var pickedProcessGroups []*fdbv1beta2.ProcessGroupStatus + BeforeEach(func() { - cluster.Spec.Version = fdbv1beta2.Versions.NextMajorVersion.String() - for _, processGroup := range cluster.Status.ProcessGroups { + cluster.Spec.LogServersPerPod = 2 + Expect(k8sClient.Update(context.TODO(), cluster)).NotTo(HaveOccurred()) + result, err := reconcileCluster(cluster) + Expect(err).NotTo(HaveOccurred()) + Expect(result.Requeue).To(BeFalse()) + _, err = reloadCluster(cluster) + Expect(err).NotTo(HaveOccurred()) + + pickedProcessGroups = internal.PickProcessGroups(cluster, fdbv1beta2.ProcessClassLog, 2) + for _, processGroup := range pickedProcessGroups { processGroup.UpdateCondition(fdbv1beta2.IncorrectCommandLine, true) } }) - When("all processes are reporting", func() { + It("should not requeue", func() { + Expect(requeue).To(BeNil()) + }) + + It("should kill the targeted processes", func() { + addresses := make(map[string]fdbv1beta2.None, 2) + for _, processGroup := range pickedProcessGroups { + for _, address := range processGroup.Addresses { + addresses[fmt.Sprintf("%s:4501", address)] = fdbv1beta2.None{} + addresses[fmt.Sprintf("%s:4503", address)] = fdbv1beta2.None{} + } + } + Expect(adminClient.KilledAddresses).To(Equal(addresses)) + }) + + When("doing an upgrade", func() { + BeforeEach(func() { + cluster.Spec.Version = fdbv1beta2.Versions.NextMajorVersion.String() + for _, processGroup := range cluster.Status.ProcessGroups { + processGroup.UpdateCondition(fdbv1beta2.IncorrectCommandLine, true) + } + }) + It("should requeue", func() { Expect(requeue).NotTo(BeNil()) }) @@ -245,7 +390,7 @@ var _ = Describe("bounceProcesses", func() { for _, processGroup := range cluster.Status.ProcessGroups { for _, address := range processGroup.Addresses { addresses[fmt.Sprintf("%s:4501", address)] = fdbv1beta2.None{} - if processGroup.ProcessClass == fdbv1beta2.ProcessClassStorage { + if processGroup.ProcessClass == fdbv1beta2.ProcessClassLog { addresses[fmt.Sprintf("%s:4503", address)] = fdbv1beta2.None{} } } @@ -269,110 +414,9 @@ var _ = Describe("bounceProcesses", func() { Expect(pendingUpgrades).To(Equal(expectedUpgrades)) }) }) - - When("the processes of a pod are missing", func() { - var missingProcessGroup *fdbv1beta2.ProcessGroupStatus - - BeforeEach(func() { - for _, processGroup := range cluster.Status.ProcessGroups { - if processGroup.ProcessClass != fdbv1beta2.ProcessClassStorage { - continue - } - - missingProcessGroup = processGroup - break - } - - Expect(missingProcessGroup.ProcessClass).To(Equal(fdbv1beta2.ProcessClassStorage)) - adminClient.MockMissingProcessGroup(missingProcessGroup.ProcessGroupID, true) - missingProcessGroup.UpdateCondition(fdbv1beta2.MissingProcesses, true) - }) - - When("they are missing for a short time", func() { - BeforeEach(func() { - missingProcessGroup.UpdateCondition(fdbv1beta2.MissingProcesses, true) - }) - - It("should requeue", func() { - Expect(requeue).NotTo(BeNil()) - Expect(requeue.message).To(Equal(fmt.Sprintf("could not find address for processes: %s", []fdbv1beta2.ProcessGroupID{missingProcessGroup.ProcessGroupID}))) - }) - - It("shouldn't kill any processes", func() { - Expect(adminClient.KilledAddresses).To(BeEmpty()) - }) - - It("should not submit pending upgrade information", func() { - pendingUpgrades, err := lockClient.GetPendingUpgrades(fdbv1beta2.Versions.NextMajorVersion) - Expect(err).NotTo(HaveOccurred()) - Expect(pendingUpgrades).To(BeEmpty()) - }) - }) - - When("they are missing for a long time", func() { - BeforeEach(func() { - missingProcessGroup.ProcessGroupConditions = []*fdbv1beta2.ProcessGroupCondition{ - { - ProcessGroupConditionType: fdbv1beta2.MissingProcesses, - Timestamp: time.Now().Add(-5 * time.Minute).Unix(), - }, - } - }) - - It("should requeue", func() { - Expect(requeue).NotTo(BeNil()) - Expect(requeue.message).To(Equal("fetch latest status after upgrade")) - }) - - It("should kill the processes except the missing process", func() { - Expect(adminClient.KilledAddresses).NotTo(BeEmpty()) - Expect(adminClient.KilledAddresses).NotTo(ContainElement(missingProcessGroup.Addresses)) - }) - - It("should not submit pending upgrade information", func() { - pendingUpgrades, err := lockClient.GetPendingUpgrades(fdbv1beta2.Versions.NextMajorVersion) - Expect(err).NotTo(HaveOccurred()) - Expect(pendingUpgrades).NotTo(BeEmpty()) - }) - }) - }) - }) - }) - - Context("with multiple log servers per pod", func() { - var pickedProcessGroups []*fdbv1beta2.ProcessGroupStatus - - BeforeEach(func() { - cluster.Spec.LogServersPerPod = 2 - Expect(k8sClient.Update(context.TODO(), cluster)).NotTo(HaveOccurred()) - result, err := reconcileCluster(cluster) - Expect(err).NotTo(HaveOccurred()) - Expect(result.Requeue).To(BeFalse()) - _, err = reloadCluster(cluster) - Expect(err).NotTo(HaveOccurred()) - - pickedProcessGroups = internal.PickProcessGroups(cluster, fdbv1beta2.ProcessClassLog, 2) - for _, processGroup := range pickedProcessGroups { - processGroup.UpdateCondition(fdbv1beta2.IncorrectCommandLine, true) - } - }) - - It("should not requeue", func() { - Expect(requeue).To(BeNil()) - }) - - It("should kill the targeted processes", func() { - addresses := make(map[string]fdbv1beta2.None, 2) - for _, processGroup := range pickedProcessGroups { - for _, address := range processGroup.Addresses { - addresses[fmt.Sprintf("%s:4501", address)] = fdbv1beta2.None{} - addresses[fmt.Sprintf("%s:4503", address)] = fdbv1beta2.None{} - } - } - Expect(adminClient.KilledAddresses).To(Equal(addresses)) }) - When("doing an upgrade", func() { + Context("with a pending upgrade", func() { BeforeEach(func() { cluster.Spec.Version = fdbv1beta2.Versions.NextMajorVersion.String() for _, processGroup := range cluster.Status.ProcessGroups { @@ -389,9 +433,6 @@ var _ = Describe("bounceProcesses", func() { for _, processGroup := range cluster.Status.ProcessGroups { for _, address := range processGroup.Addresses { addresses[fmt.Sprintf("%s:4501", address)] = fdbv1beta2.None{} - if processGroup.ProcessClass == fdbv1beta2.ProcessClassLog { - addresses[fmt.Sprintf("%s:4503", address)] = fdbv1beta2.None{} - } } } Expect(adminClient.KilledAddresses).To(Equal(addresses)) @@ -412,357 +453,391 @@ var _ = Describe("bounceProcesses", func() { Expect(err).NotTo(HaveOccurred()) Expect(pendingUpgrades).To(Equal(expectedUpgrades)) }) - }) - }) - - Context("with a pending upgrade", func() { - BeforeEach(func() { - cluster.Spec.Version = fdbv1beta2.Versions.NextMajorVersion.String() - for _, processGroup := range cluster.Status.ProcessGroups { - processGroup.UpdateCondition(fdbv1beta2.IncorrectCommandLine, true) - } - }) - It("should requeue", func() { - Expect(requeue).NotTo(BeNil()) - }) + Context("with an unknown process", func() { + BeforeEach(func() { + adminClient.MockAdditionalProcesses([]fdbv1beta2.ProcessGroupStatus{{ + ProcessGroupID: "dc2-storage-1", + ProcessClass: "storage", + Addresses: []string{"1.2.3.4"}, + }}) + }) - It("should kill all the processes", func() { - addresses := make(map[string]fdbv1beta2.None, len(cluster.Status.ProcessGroups)) - for _, processGroup := range cluster.Status.ProcessGroups { - for _, address := range processGroup.Addresses { - addresses[fmt.Sprintf("%s:4501", address)] = fdbv1beta2.None{} - } - } - Expect(adminClient.KilledAddresses).To(Equal(addresses)) - }) + It("should requeue", func() { + Expect(requeue).NotTo(BeNil()) + Expect(requeue.message).To(Equal("Waiting for processes to be updated: [dc2-storage-1]")) + }) - It("should update the running version in the status", func() { - _, err = reloadCluster(cluster) - Expect(err).NotTo(HaveOccurred()) - Expect(cluster.Status.RunningVersion).To(Equal(fdbv1beta2.Versions.NextMajorVersion.String())) - }) + It("should not kill any processes", func() { + Expect(adminClient.KilledAddresses).To(BeEmpty()) + }) - It("should submit pending upgrade information for all the processes", func() { - expectedUpgrades := make(map[fdbv1beta2.ProcessGroupID]bool, len(cluster.Status.ProcessGroups)) - for _, processGroup := range cluster.Status.ProcessGroups { - expectedUpgrades[processGroup.ProcessGroupID] = true - } - pendingUpgrades, err := lockClient.GetPendingUpgrades(fdbv1beta2.Versions.NextMajorVersion) - Expect(err).NotTo(HaveOccurred()) - Expect(pendingUpgrades).To(Equal(expectedUpgrades)) - }) + It("should not update the running version in the status", func() { + _, err = reloadCluster(cluster) + Expect(err).NotTo(HaveOccurred()) + Expect(cluster.Status.RunningVersion).To(Equal(fdbv1beta2.Versions.Default.String())) + }) - Context("with an unknown process", func() { - BeforeEach(func() { - adminClient.MockAdditionalProcesses([]fdbv1beta2.ProcessGroupStatus{{ - ProcessGroupID: "dc2-storage-1", - ProcessClass: "storage", - Addresses: []string{"1.2.3.4"}, - }}) - }) + It("should submit pending upgrade information for all the processes", func() { + expectedUpgrades := make(map[fdbv1beta2.ProcessGroupID]bool, len(cluster.Status.ProcessGroups)) + for _, processGroup := range cluster.Status.ProcessGroups { + expectedUpgrades[processGroup.ProcessGroupID] = true + } + pendingUpgrades, err := lockClient.GetPendingUpgrades(fdbv1beta2.Versions.NextMajorVersion) + Expect(err).NotTo(HaveOccurred()) + Expect(pendingUpgrades).To(Equal(expectedUpgrades)) + }) - It("should requeue", func() { - Expect(requeue).NotTo(BeNil()) - Expect(requeue.message).To(Equal("Waiting for processes to be updated: [dc2-storage-1]")) - }) + Context("with a pending upgrade for the unknown process", func() { + BeforeEach(func() { + Expect(lockClient.AddPendingUpgrades(fdbv1beta2.Versions.NextMajorVersion, []fdbv1beta2.ProcessGroupID{"dc2-storage-1"})).NotTo(HaveOccurred()) + }) - It("should not kill any processes", func() { - Expect(adminClient.KilledAddresses).To(BeEmpty()) - }) + It("should requeue", func() { + Expect(requeue).NotTo(BeNil()) + }) - It("should not update the running version in the status", func() { - _, err = reloadCluster(cluster) - Expect(err).NotTo(HaveOccurred()) - Expect(cluster.Status.RunningVersion).To(Equal(fdbv1beta2.Versions.Default.String())) - }) + It("should kill all the processes", func() { + addresses := make(map[string]fdbv1beta2.None, len(cluster.Status.ProcessGroups)+1) + for _, processGroup := range cluster.Status.ProcessGroups { + for _, address := range processGroup.Addresses { + addresses[fmt.Sprintf("%s:4501", address)] = fdbv1beta2.None{} + } + } + addresses["1.2.3.4:4501"] = fdbv1beta2.None{} + Expect(adminClient.KilledAddresses).To(Equal(addresses)) + }) + }) - It("should submit pending upgrade information for all the processes", func() { - expectedUpgrades := make(map[fdbv1beta2.ProcessGroupID]bool, len(cluster.Status.ProcessGroups)) - for _, processGroup := range cluster.Status.ProcessGroups { - expectedUpgrades[processGroup.ProcessGroupID] = true - } - pendingUpgrades, err := lockClient.GetPendingUpgrades(fdbv1beta2.Versions.NextMajorVersion) - Expect(err).NotTo(HaveOccurred()) - Expect(pendingUpgrades).To(Equal(expectedUpgrades)) - }) + Context("with a pending upgrade to an older version", func() { + BeforeEach(func() { + err = lockClient.AddPendingUpgrades(fdbv1beta2.Versions.NextPatchVersion, []fdbv1beta2.ProcessGroupID{"dc2-storage-1"}) + Expect(err).NotTo(HaveOccurred()) + }) - Context("with a pending upgrade for the unknown process", func() { - BeforeEach(func() { - Expect(lockClient.AddPendingUpgrades(fdbv1beta2.Versions.NextMajorVersion, []fdbv1beta2.ProcessGroupID{"dc2-storage-1"})).NotTo(HaveOccurred()) - }) + It("should requeue", func() { + Expect(requeue).NotTo(BeNil()) + Expect(requeue.message).To(Equal("Waiting for processes to be updated: [dc2-storage-1]")) + }) - It("should requeue", func() { - Expect(requeue).NotTo(BeNil()) + It("should not kill any processes", func() { + Expect(adminClient.KilledAddresses).To(BeEmpty()) + }) }) - It("should kill all the processes", func() { - addresses := make(map[string]fdbv1beta2.None, len(cluster.Status.ProcessGroups)+1) - for _, processGroup := range cluster.Status.ProcessGroups { - for _, address := range processGroup.Addresses { - addresses[fmt.Sprintf("%s:4501", address)] = fdbv1beta2.None{} + Context("with locks disabled", func() { + BeforeEach(func() { + cluster.Spec.LockOptions.DisableLocks = pointer.Bool(true) + }) + + It("should requeue", func() { + Expect(requeue).NotTo(BeNil()) + }) + + It("should kill all the processes", func() { + addresses := make(map[string]fdbv1beta2.None, len(cluster.Status.ProcessGroups)) + for _, processGroup := range cluster.Status.ProcessGroups { + for _, address := range processGroup.Addresses { + addresses[fmt.Sprintf("%s:4501", address)] = fdbv1beta2.None{} + } } - } - addresses["1.2.3.4:4501"] = fdbv1beta2.None{} - Expect(adminClient.KilledAddresses).To(Equal(addresses)) + Expect(adminClient.KilledAddresses).To(Equal(addresses)) + }) + + It("should not submit pending upgrade information", func() { + pendingUpgrades, err := lockClient.GetPendingUpgrades(fdbv1beta2.Versions.NextMajorVersion) + Expect(err).NotTo(HaveOccurred()) + Expect(pendingUpgrades).To(BeEmpty()) + }) }) }) - Context("with a pending upgrade to an older version", func() { + When("one process is missing for a short time", func() { + var missingProcessGroup *fdbv1beta2.ProcessGroupStatus + BeforeEach(func() { - err = lockClient.AddPendingUpgrades(fdbv1beta2.Versions.NextPatchVersion, []fdbv1beta2.ProcessGroupID{"dc2-storage-1"}) - Expect(err).NotTo(HaveOccurred()) + missingProcessGroup = cluster.Status.ProcessGroups[0] + adminClient.MockMissingProcessGroup(missingProcessGroup.ProcessGroupID, true) + missingProcessGroup.UpdateCondition(fdbv1beta2.MissingProcesses, true) }) It("should requeue", func() { Expect(requeue).NotTo(BeNil()) - Expect(requeue.message).To(Equal("Waiting for processes to be updated: [dc2-storage-1]")) + Expect(requeue.message).To(Equal(fmt.Sprintf("could not find address for processes: %s", []fdbv1beta2.ProcessGroupID{missingProcessGroup.ProcessGroupID}))) }) - It("should not kill any processes", func() { + It("shouldn't kill any processes", func() { Expect(adminClient.KilledAddresses).To(BeEmpty()) }) + + It("should not submit pending upgrade information", func() { + pendingUpgrades, err := lockClient.GetPendingUpgrades(fdbv1beta2.Versions.NextMajorVersion) + Expect(err).NotTo(HaveOccurred()) + Expect(pendingUpgrades).To(BeEmpty()) + }) }) - Context("with locks disabled", func() { + When("one process is missing is missing for a long time", func() { + var missingProcessGroup *fdbv1beta2.ProcessGroupStatus + BeforeEach(func() { - cluster.Spec.LockOptions.DisableLocks = pointer.Bool(true) + missingProcessGroup = cluster.Status.ProcessGroups[0] + adminClient.MockMissingProcessGroup(missingProcessGroup.ProcessGroupID, true) + missingProcessGroup.ProcessGroupConditions = []*fdbv1beta2.ProcessGroupCondition{ + { + ProcessGroupConditionType: fdbv1beta2.MissingProcesses, + Timestamp: time.Now().Add(-5 * time.Minute).Unix(), + }, + } }) It("should requeue", func() { Expect(requeue).NotTo(BeNil()) + Expect(requeue.message).To(Equal("fetch latest status after upgrade")) }) - It("should kill all the processes", func() { - addresses := make(map[string]fdbv1beta2.None, len(cluster.Status.ProcessGroups)) - for _, processGroup := range cluster.Status.ProcessGroups { - for _, address := range processGroup.Addresses { - addresses[fmt.Sprintf("%s:4501", address)] = fdbv1beta2.None{} - } - } - Expect(adminClient.KilledAddresses).To(Equal(addresses)) + It("should kill the processes except the missing process", func() { + Expect(adminClient.KilledAddresses).NotTo(BeEmpty()) + Expect(adminClient.KilledAddresses).NotTo(ContainElement(missingProcessGroup.Addresses)) }) It("should not submit pending upgrade information", func() { pendingUpgrades, err := lockClient.GetPendingUpgrades(fdbv1beta2.Versions.NextMajorVersion) Expect(err).NotTo(HaveOccurred()) - Expect(pendingUpgrades).To(BeEmpty()) + Expect(pendingUpgrades).NotTo(BeEmpty()) }) }) }) - When("one process is missing for a short time", func() { - var missingProcessGroup *fdbv1beta2.ProcessGroupStatus + When("the buggify option ignoreDuringRestart is set", func() { + var ignoredProcessGroup *fdbv1beta2.ProcessGroupStatus BeforeEach(func() { - missingProcessGroup = cluster.Status.ProcessGroups[0] - adminClient.MockMissingProcessGroup(missingProcessGroup.ProcessGroupID, true) - missingProcessGroup.UpdateCondition(fdbv1beta2.MissingProcesses, true) - }) - - It("should requeue", func() { - Expect(requeue).NotTo(BeNil()) - Expect(requeue.message).To(Equal(fmt.Sprintf("could not find address for processes: %s", []fdbv1beta2.ProcessGroupID{missingProcessGroup.ProcessGroupID}))) - }) - - It("shouldn't kill any processes", func() { - Expect(adminClient.KilledAddresses).To(BeEmpty()) + ignoredProcessGroup = cluster.Status.ProcessGroups[0] + cluster.Spec.Buggify.IgnoreDuringRestart = []fdbv1beta2.ProcessGroupID{ignoredProcessGroup.ProcessGroupID} + for _, processGroup := range cluster.Status.ProcessGroups { + processGroup.UpdateCondition(fdbv1beta2.IncorrectCommandLine, true) + } }) - It("should not submit pending upgrade information", func() { - pendingUpgrades, err := lockClient.GetPendingUpgrades(fdbv1beta2.Versions.NextMajorVersion) + It("should not requeue", func() { Expect(err).NotTo(HaveOccurred()) - Expect(pendingUpgrades).To(BeEmpty()) + Expect(requeue).To(BeNil()) }) - }) - When("one process is missing is missing for a long time", func() { - var missingProcessGroup *fdbv1beta2.ProcessGroupStatus + It("should not kill any processes", func() { + ignoredAddress := ignoredProcessGroup.Addresses[0] - BeforeEach(func() { - missingProcessGroup = cluster.Status.ProcessGroups[0] - adminClient.MockMissingProcessGroup(missingProcessGroup.ProcessGroupID, true) - missingProcessGroup.ProcessGroupConditions = []*fdbv1beta2.ProcessGroupCondition{ - { - ProcessGroupConditionType: fdbv1beta2.MissingProcesses, - Timestamp: time.Now().Add(-5 * time.Minute).Unix(), - }, + for address := range adminClient.KilledAddresses { + Expect(address).NotTo(HavePrefix(ignoredAddress)) } - }) - - It("should requeue", func() { - Expect(requeue).NotTo(BeNil()) - Expect(requeue.message).To(Equal("fetch latest status after upgrade")) - }) - - It("should kill the processes except the missing process", func() { - Expect(adminClient.KilledAddresses).NotTo(BeEmpty()) - Expect(adminClient.KilledAddresses).NotTo(ContainElement(missingProcessGroup.Addresses)) - }) - It("should not submit pending upgrade information", func() { - pendingUpgrades, err := lockClient.GetPendingUpgrades(fdbv1beta2.Versions.NextMajorVersion) - Expect(err).NotTo(HaveOccurred()) - Expect(pendingUpgrades).NotTo(BeEmpty()) + Expect(adminClient.KilledAddresses).To(HaveLen(len(cluster.Status.ProcessGroups) - 1)) }) - }) - }) - - When("the buggify option ignoreDuringRestart is set", func() { - var ignoredProcessGroup *fdbv1beta2.ProcessGroupStatus - BeforeEach(func() { - ignoredProcessGroup = cluster.Status.ProcessGroups[0] - cluster.Spec.Buggify.IgnoreDuringRestart = []fdbv1beta2.ProcessGroupID{ignoredProcessGroup.ProcessGroupID} - for _, processGroup := range cluster.Status.ProcessGroups { - processGroup.UpdateCondition(fdbv1beta2.IncorrectCommandLine, true) - } - }) - - It("should not requeue", func() { - Expect(err).NotTo(HaveOccurred()) - Expect(requeue).To(BeNil()) - }) + When("filtering process groups for buggify", func() { + var filteredAddresses []fdbv1beta2.ProcessAddress + var removed bool - It("should not kill any processes", func() { - ignoredAddress := ignoredProcessGroup.Addresses[0] - - for address := range adminClient.KilledAddresses { - Expect(address).NotTo(HavePrefix(ignoredAddress)) - } - - Expect(adminClient.KilledAddresses).To(HaveLen(len(cluster.Status.ProcessGroups) - 1)) - }) - - When("filtering process groups for buggify", func() { - var filteredAddresses []fdbv1beta2.ProcessAddress - var removed bool + BeforeEach(func() { + status, err := adminClient.GetStatus() + Expect(err).NotTo(HaveOccurred()) + processAddresses := make([]fdbv1beta2.ProcessAddress, 0, len(cluster.Status.ProcessGroups)) + for _, process := range status.Cluster.Processes { + processAddresses = append(processAddresses, process.Address) + } - BeforeEach(func() { - status, err := adminClient.GetStatus() - Expect(err).NotTo(HaveOccurred()) - processAddresses := make([]fdbv1beta2.ProcessAddress, 0, len(cluster.Status.ProcessGroups)) - for _, process := range status.Cluster.Processes { - processAddresses = append(processAddresses, process.Address) - } + filteredAddresses, removed = buggify.FilterIgnoredProcessGroups(cluster, processAddresses, status) + }) - filteredAddresses, removed = buggify.FilterIgnoredProcessGroups(cluster, processAddresses, status) + It("should filter the ignored address", func() { + Expect(removed).To(BeTrue()) + Expect(len(filteredAddresses)).To(BeNumerically("==", len(cluster.Status.ProcessGroups)-1)) + }) }) - It("should filter the ignored address", func() { - Expect(removed).To(BeTrue()) - Expect(len(filteredAddresses)).To(BeNumerically("==", len(cluster.Status.ProcessGroups)-1)) - }) - }) + When("buggify ignore processes is set and only this process has the IncorrectCommandLine condition", func() { + var filteredAddresses []fdbv1beta2.ProcessAddress + var removed bool - When("buggify ignore processes is set and only this process has the IncorrectCommandLine condition", func() { - var filteredAddresses []fdbv1beta2.ProcessAddress - var removed bool + BeforeEach(func() { + for _, processGroup := range cluster.Status.ProcessGroups { + processGroup.UpdateCondition(fdbv1beta2.IncorrectCommandLine, processGroup.ProcessGroupID == ignoredProcessGroup.ProcessGroupID) + } - BeforeEach(func() { - for _, processGroup := range cluster.Status.ProcessGroups { - processGroup.UpdateCondition(fdbv1beta2.IncorrectCommandLine, processGroup.ProcessGroupID == ignoredProcessGroup.ProcessGroupID) - } + status, err := adminClient.GetStatus() + Expect(err).NotTo(HaveOccurred()) + processAddresses := make([]fdbv1beta2.ProcessAddress, 0, len(cluster.Status.ProcessGroups)) + for _, process := range status.Cluster.Processes { + if fdbv1beta2.ProcessGroupID(process.Locality[fdbv1beta2.FDBLocalityInstanceIDKey]) != ignoredProcessGroup.ProcessGroupID { + continue + } - status, err := adminClient.GetStatus() - Expect(err).NotTo(HaveOccurred()) - processAddresses := make([]fdbv1beta2.ProcessAddress, 0, len(cluster.Status.ProcessGroups)) - for _, process := range status.Cluster.Processes { - if fdbv1beta2.ProcessGroupID(process.Locality[fdbv1beta2.FDBLocalityInstanceIDKey]) != ignoredProcessGroup.ProcessGroupID { - continue + processAddresses = append(processAddresses, process.Address) } - processAddresses = append(processAddresses, process.Address) - } - - filteredAddresses, removed = buggify.FilterIgnoredProcessGroups(cluster, processAddresses, status) - }) + filteredAddresses, removed = buggify.FilterIgnoredProcessGroups(cluster, processAddresses, status) + }) - It("should filter the ignored address", func() { - Expect(removed).To(BeTrue()) - Expect(filteredAddresses).To(BeEmpty()) - }) + It("should filter the ignored address", func() { + Expect(removed).To(BeTrue()) + Expect(filteredAddresses).To(BeEmpty()) + }) - It("should not requeue", func() { - Expect(err).NotTo(HaveOccurred()) - Expect(requeue).To(BeNil()) - }) + It("should not requeue", func() { + Expect(err).NotTo(HaveOccurred()) + Expect(requeue).To(BeNil()) + }) - It("should not kill any processes", func() { - Expect(adminClient.KilledAddresses).To(BeEmpty()) + It("should not kill any processes", func() { + Expect(adminClient.KilledAddresses).To(BeEmpty()) + }) }) }) - }) - When("when there are unreachable processes", func() { - When("the unreachable processes include at least one tester process", func() { - BeforeEach(func() { - adminClient, err = mock.NewMockAdminClientUncast(cluster, k8sClient) - Expect(err).NotTo(HaveOccurred()) + When("when there are unreachable processes", func() { + When("the unreachable processes include at least one tester process", func() { + BeforeEach(func() { + adminClient, err = mock.NewMockAdminClientUncast(cluster, k8sClient) + Expect(err).NotTo(HaveOccurred()) - adminClient.FrozenStatus = &fdbv1beta2.FoundationDBStatus{ - Client: fdbv1beta2.FoundationDBStatusLocalClientInfo{ - DatabaseStatus: fdbv1beta2.FoundationDBStatusClientDBStatus{ - Available: true, - }, - }, - Cluster: fdbv1beta2.FoundationDBStatusClusterInfo{ - Messages: []fdbv1beta2.FoundationDBStatusMessage{ - { - Name: "status_incomplete", + adminClient.FrozenStatus = &fdbv1beta2.FoundationDBStatus{ + Client: fdbv1beta2.FoundationDBStatusLocalClientInfo{ + DatabaseStatus: fdbv1beta2.FoundationDBStatusClientDBStatus{ + Available: true, }, - { - Name: "unreachable_processes", - UnreachableProcesses: []fdbv1beta2.FoundationDBUnreachableProcess{ - { - Address: "192.168.0.1:4500:tls", + }, + Cluster: fdbv1beta2.FoundationDBStatusClusterInfo{ + Messages: []fdbv1beta2.FoundationDBStatusMessage{ + { + Name: "status_incomplete", + }, + { + Name: "unreachable_processes", + UnreachableProcesses: []fdbv1beta2.FoundationDBUnreachableProcess{ + { + Address: "192.168.0.1:4500:tls", + }, }, }, }, - }, - Processes: map[fdbv1beta2.ProcessGroupID]fdbv1beta2.FoundationDBStatusProcessInfo{ - "1": { - ProcessClass: fdbv1beta2.ProcessClassStateless, - Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.2:4500:tls"}, - Roles: []fdbv1beta2.FoundationDBStatusProcessRoleInfo{ - { - Role: string(fdbv1beta2.ProcessRoleClusterController), + Processes: map[fdbv1beta2.ProcessGroupID]fdbv1beta2.FoundationDBStatusProcessInfo{ + "1": { + ProcessClass: fdbv1beta2.ProcessClassStateless, + Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.2:4500:tls"}, + Roles: []fdbv1beta2.FoundationDBStatusProcessRoleInfo{ + { + Role: string(fdbv1beta2.ProcessRoleClusterController), + }, + }, + UptimeSeconds: 61.0, + Locality: map[string]string{ + fdbv1beta2.FDBLocalityInstanceIDKey: "1", }, }, - UptimeSeconds: 61.0, - Locality: map[string]string{ - fdbv1beta2.FDBLocalityInstanceIDKey: "1", - }, - }, - "2": { - ProcessClass: fdbv1beta2.ProcessClassTest, - Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.1:4500:tls"}, - UptimeSeconds: 61.0, - Locality: map[string]string{ - fdbv1beta2.FDBLocalityInstanceIDKey: "2", + "2": { + ProcessClass: fdbv1beta2.ProcessClassTest, + Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.1:4500:tls"}, + UptimeSeconds: 61.0, + Locality: map[string]string{ + fdbv1beta2.FDBLocalityInstanceIDKey: "2", + }, }, }, }, - }, - } - }) + } + }) - It("should not requeue", func() { - Expect(err).NotTo(HaveOccurred()) - Expect(requeue).To(BeNil()) - }) + It("should not requeue", func() { + Expect(err).NotTo(HaveOccurred()) + Expect(requeue).To(BeNil()) + }) + + It("should kill the cluster controller", func() { + Expect(adminClient.KilledAddresses).To(HaveKey("192.168.0.2:4500:tls")) + }) + + When("the minimum required uptime is bigger than the current uptime", func() { + BeforeEach(func() { + clusterReconciler.MinimumRequiredUptimeCCBounce = 2 * time.Minute + }) - It("should kill the cluster controller", func() { - Expect(adminClient.KilledAddresses).To(HaveKey("192.168.0.2:4500:tls")) + AfterEach(func() { + clusterReconciler.MinimumRequiredUptimeCCBounce = 0 + }) + + It("should not requeue", func() { + Expect(err).NotTo(HaveOccurred()) + Expect(requeue).To(BeNil()) + }) + + It("should not kill the cluster controller", func() { + Expect(adminClient.KilledAddresses).To(BeEmpty()) + }) + }) + + When("the FDB version automatically removes old tester processes", func() { + var previousVersion string + + BeforeEach(func() { + previousVersion = cluster.Status.RunningVersion + cluster.Spec.Version = "7.1.57" + cluster.Status.RunningVersion = "7.1.57" + }) + + AfterEach(func() { + cluster.Status.RunningVersion = previousVersion + cluster.Spec.Version = previousVersion + }) + + It("should not requeue", func() { + Expect(err).NotTo(HaveOccurred()) + Expect(requeue).To(BeNil()) + }) + + It("should not kill the cluster controller", func() { + Expect(adminClient.KilledAddresses).To(BeEmpty()) + }) + }) }) - When("the minimum required uptime is bigger than the current uptime", func() { + When("the unreachable processes include no tester processes", func() { BeforeEach(func() { - clusterReconciler.MinimumRequiredUptimeCCBounce = 2 * time.Minute - }) + adminClient, err = mock.NewMockAdminClientUncast(cluster, k8sClient) + Expect(err).NotTo(HaveOccurred()) - AfterEach(func() { - clusterReconciler.MinimumRequiredUptimeCCBounce = 0 + adminClient.FrozenStatus = &fdbv1beta2.FoundationDBStatus{ + Cluster: fdbv1beta2.FoundationDBStatusClusterInfo{ + Messages: []fdbv1beta2.FoundationDBStatusMessage{ + { + Name: "status_incomplete", + }, + { + Name: "unreachable_processes", + UnreachableProcesses: []fdbv1beta2.FoundationDBUnreachableProcess{ + { + Address: "192.168.0.1:4500:tls", + }, + }, + }, + }, + Processes: map[fdbv1beta2.ProcessGroupID]fdbv1beta2.FoundationDBStatusProcessInfo{ + "1": { + ProcessClass: fdbv1beta2.ProcessClassStateless, + Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.2:4500:tls"}, + Roles: []fdbv1beta2.FoundationDBStatusProcessRoleInfo{ + { + Role: string(fdbv1beta2.ProcessRoleClusterController), + }, + }, + }, + "2": { + ProcessClass: fdbv1beta2.ProcessClassStorage, + Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.1:4500:tls"}, + }, + }, + }, + } }) It("should not requeue", func() { @@ -775,18 +850,40 @@ var _ = Describe("bounceProcesses", func() { }) }) - When("the FDB version automatically removes old tester processes", func() { - var previousVersion string - + When("the cluster message does not contain unreachable processes", func() { BeforeEach(func() { - previousVersion = cluster.Status.RunningVersion - cluster.Spec.Version = "7.1.57" - cluster.Status.RunningVersion = "7.1.57" - }) + adminClient, err = mock.NewMockAdminClientUncast(cluster, k8sClient) + Expect(err).NotTo(HaveOccurred()) - AfterEach(func() { - cluster.Status.RunningVersion = previousVersion - cluster.Spec.Version = previousVersion + adminClient.FrozenStatus = &fdbv1beta2.FoundationDBStatus{ + Client: fdbv1beta2.FoundationDBStatusLocalClientInfo{ + DatabaseStatus: fdbv1beta2.FoundationDBStatusClientDBStatus{ + Available: true, + }, + }, + Cluster: fdbv1beta2.FoundationDBStatusClusterInfo{ + Messages: []fdbv1beta2.FoundationDBStatusMessage{ + { + Name: "status_incomplete", + }, + }, + Processes: map[fdbv1beta2.ProcessGroupID]fdbv1beta2.FoundationDBStatusProcessInfo{ + "1": { + ProcessClass: fdbv1beta2.ProcessClassStateless, + Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.2:4500:tls"}, + Roles: []fdbv1beta2.FoundationDBStatusProcessRoleInfo{ + { + Role: string(fdbv1beta2.ProcessRoleClusterController), + }, + }, + }, + "2": { + ProcessClass: fdbv1beta2.ProcessClassTest, + Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.1:4500:tls"}, + }, + }, + }, + } }) It("should not requeue", func() { @@ -799,100 +896,196 @@ var _ = Describe("bounceProcesses", func() { }) }) }) + }) - When("the unreachable processes include no tester processes", func() { - BeforeEach(func() { - adminClient, err = mock.NewMockAdminClientUncast(cluster, k8sClient) - Expect(err).NotTo(HaveOccurred()) - - adminClient.FrozenStatus = &fdbv1beta2.FoundationDBStatus{ - Cluster: fdbv1beta2.FoundationDBStatusClusterInfo{ - Messages: []fdbv1beta2.FoundationDBStatusMessage{ - { - Name: "status_incomplete", + DescribeTable("when getting all the addresses to upgrade from the current status", func(inputStatus *fdbv1beta2.FoundationDBStatus, pendingUpgrades map[fdbv1beta2.ProcessGroupID]bool, version string, expectedAddresses []fdbv1beta2.ProcessAddress, expectedNotReady []string) { + addresses, notReady := getUpgradeAddressesFromStatus(GinkgoLogr, inputStatus, pendingUpgrades, version) + Expect(addresses).To(ConsistOf(expectedAddresses)) + Expect(notReady).To(ConsistOf(expectedNotReady)) + }, + Entry("no tester processes are present", + &fdbv1beta2.FoundationDBStatus{ + Cluster: fdbv1beta2.FoundationDBStatusClusterInfo{ + Processes: map[fdbv1beta2.ProcessGroupID]fdbv1beta2.FoundationDBStatusProcessInfo{ + "1": { + ProcessClass: fdbv1beta2.ProcessClassStorage, + Locality: map[string]string{ + fdbv1beta2.FDBLocalityInstanceIDKey: "1", }, - { - Name: "unreachable_processes", - UnreachableProcesses: []fdbv1beta2.FoundationDBUnreachableProcess{ - { - Address: "192.168.0.1:4500:tls", - }, - }, + Version: "6.3.43", + Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.2:4500:tls"}, + }, + }, + }, + }, + map[fdbv1beta2.ProcessGroupID]bool{ + "1": true, + }, + "7.1.56", + []fdbv1beta2.ProcessAddress{{StringAddress: "192.168.0.2:4500:tls"}}, + []string{}, + ), + Entry("a tester process is present", + &fdbv1beta2.FoundationDBStatus{ + Cluster: fdbv1beta2.FoundationDBStatusClusterInfo{ + Processes: map[fdbv1beta2.ProcessGroupID]fdbv1beta2.FoundationDBStatusProcessInfo{ + "1": { + ProcessClass: fdbv1beta2.ProcessClassStorage, + Locality: map[string]string{ + fdbv1beta2.FDBLocalityInstanceIDKey: "1", }, + Version: "6.3.43", + Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.2:4500:tls"}, }, - Processes: map[fdbv1beta2.ProcessGroupID]fdbv1beta2.FoundationDBStatusProcessInfo{ - "1": { - ProcessClass: fdbv1beta2.ProcessClassStateless, - Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.2:4500:tls"}, - Roles: []fdbv1beta2.FoundationDBStatusProcessRoleInfo{ - { - Role: string(fdbv1beta2.ProcessRoleClusterController), - }, - }, + "2": { + ProcessClass: fdbv1beta2.ProcessClassTest, + Locality: map[string]string{ + fdbv1beta2.FDBLocalityInstanceIDKey: "2", }, - "2": { - ProcessClass: fdbv1beta2.ProcessClassStorage, - Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.1:4500:tls"}, + Version: "6.3.43", + Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.3:4500:tls"}, + }, + }, + }, + }, + map[fdbv1beta2.ProcessGroupID]bool{ + "1": true, + }, + "7.1.56", + []fdbv1beta2.ProcessAddress{{StringAddress: "192.168.0.2:4500:tls"}}, + []string{}, + ), + Entry("a process with missing localities is present", + &fdbv1beta2.FoundationDBStatus{ + Cluster: fdbv1beta2.FoundationDBStatusClusterInfo{ + Processes: map[fdbv1beta2.ProcessGroupID]fdbv1beta2.FoundationDBStatusProcessInfo{ + "1": { + ProcessClass: fdbv1beta2.ProcessClassStorage, + Locality: map[string]string{ + fdbv1beta2.FDBLocalityInstanceIDKey: "1", + }, + Version: "6.3.43", + Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.2:4500:tls"}, + }, + "2": { + ProcessClass: fdbv1beta2.ProcessClassTest, + Locality: map[string]string{ + fdbv1beta2.FDBLocalityInstanceIDKey: "2", }, + Version: "6.3.43", + Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.3:4500:tls"}, + }, + "3": { + ProcessClass: fdbv1beta2.ProcessClassStorage, + Locality: map[string]string{}, + Version: "6.3.43", + Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.4:4500:tls"}, }, }, - } - }) - - It("should not requeue", func() { - Expect(err).NotTo(HaveOccurred()) - Expect(requeue).To(BeNil()) - }) - - It("should not kill the cluster controller", func() { - Expect(adminClient.KilledAddresses).To(BeEmpty()) - }) - }) - - When("the cluster message does not contain unreachable processes", func() { - BeforeEach(func() { - adminClient, err = mock.NewMockAdminClientUncast(cluster, k8sClient) - Expect(err).NotTo(HaveOccurred()) - - adminClient.FrozenStatus = &fdbv1beta2.FoundationDBStatus{ - Client: fdbv1beta2.FoundationDBStatusLocalClientInfo{ - DatabaseStatus: fdbv1beta2.FoundationDBStatusClientDBStatus{ - Available: true, + }, + }, + map[fdbv1beta2.ProcessGroupID]bool{ + "1": true, + }, + "7.1.56", + []fdbv1beta2.ProcessAddress{{StringAddress: "192.168.0.2:4500:tls"}}, + []string{}, + ), + Entry("a process which is already upgraded is present", + &fdbv1beta2.FoundationDBStatus{ + Cluster: fdbv1beta2.FoundationDBStatusClusterInfo{ + Processes: map[fdbv1beta2.ProcessGroupID]fdbv1beta2.FoundationDBStatusProcessInfo{ + "1": { + ProcessClass: fdbv1beta2.ProcessClassStorage, + Locality: map[string]string{ + fdbv1beta2.FDBLocalityInstanceIDKey: "1", + }, + Version: "6.3.43", + Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.2:4500:tls"}, + }, + "2": { + ProcessClass: fdbv1beta2.ProcessClassTest, + Locality: map[string]string{ + fdbv1beta2.FDBLocalityInstanceIDKey: "2", + }, + Version: "6.3.43", + Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.3:4500:tls"}, + }, + "3": { + ProcessClass: fdbv1beta2.ProcessClassStorage, + Locality: map[string]string{}, + Version: "6.3.43", + Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.4:4500:tls"}, + }, + "4": { + ProcessClass: fdbv1beta2.ProcessClassStorage, + Locality: map[string]string{ + fdbv1beta2.FDBLocalityInstanceIDKey: "4", + }, + Version: "7.1.56", + Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.5:4500:tls"}, }, }, - Cluster: fdbv1beta2.FoundationDBStatusClusterInfo{ - Messages: []fdbv1beta2.FoundationDBStatusMessage{ - { - Name: "status_incomplete", + }, + }, + map[fdbv1beta2.ProcessGroupID]bool{ + "1": true, + }, + "7.1.56", + []fdbv1beta2.ProcessAddress{{StringAddress: "192.168.0.2:4500:tls"}}, + []string{}, + ), + Entry("a process is missing in the pending upgrades map", + &fdbv1beta2.FoundationDBStatus{ + Cluster: fdbv1beta2.FoundationDBStatusClusterInfo{ + Processes: map[fdbv1beta2.ProcessGroupID]fdbv1beta2.FoundationDBStatusProcessInfo{ + "1": { + ProcessClass: fdbv1beta2.ProcessClassStorage, + Locality: map[string]string{ + fdbv1beta2.FDBLocalityInstanceIDKey: "1", }, + Version: "6.3.43", + Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.2:4500:tls"}, }, - Processes: map[fdbv1beta2.ProcessGroupID]fdbv1beta2.FoundationDBStatusProcessInfo{ - "1": { - ProcessClass: fdbv1beta2.ProcessClassStateless, - Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.2:4500:tls"}, - Roles: []fdbv1beta2.FoundationDBStatusProcessRoleInfo{ - { - Role: string(fdbv1beta2.ProcessRoleClusterController), - }, - }, + "2": { + ProcessClass: fdbv1beta2.ProcessClassTest, + Locality: map[string]string{ + fdbv1beta2.FDBLocalityInstanceIDKey: "2", + }, + Version: "6.3.43", + Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.3:4500:tls"}, + }, + "3": { + ProcessClass: fdbv1beta2.ProcessClassStorage, + Locality: map[string]string{}, + Version: "6.3.43", + Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.4:4500:tls"}, + }, + "4": { + ProcessClass: fdbv1beta2.ProcessClassStorage, + Locality: map[string]string{ + fdbv1beta2.FDBLocalityInstanceIDKey: "4", }, - "2": { - ProcessClass: fdbv1beta2.ProcessClassTest, - Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.1:4500:tls"}, + Version: "7.1.56", + Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.5:4500:tls"}, + }, + "5": { + ProcessClass: fdbv1beta2.ProcessClassStorage, + Locality: map[string]string{ + fdbv1beta2.FDBLocalityInstanceIDKey: "5", }, + Version: "6.3.43", + Address: fdbv1beta2.ProcessAddress{StringAddress: "192.168.0.6:4500:tls"}, }, }, - } - }) - - It("should not requeue", func() { - Expect(err).NotTo(HaveOccurred()) - Expect(requeue).To(BeNil()) - }) - - It("should not kill the cluster controller", func() { - Expect(adminClient.KilledAddresses).To(BeEmpty()) - }) - }) - }) + }, + }, + map[fdbv1beta2.ProcessGroupID]bool{ + "1": true, + }, + "7.1.56", + []fdbv1beta2.ProcessAddress{{StringAddress: "192.168.0.2:4500:tls"}}, + []string{"5"}, + ), + ) }) diff --git a/e2e/fixtures/fdb_cluster.go b/e2e/fixtures/fdb_cluster.go index 9cb6a3ed..cf16586f 100644 --- a/e2e/fixtures/fdb_cluster.go +++ b/e2e/fixtures/fdb_cluster.go @@ -21,9 +21,8 @@ package fixtures import ( - ctx "context" + "context" "fmt" - "golang.org/x/net/context" "log" "math" "strconv" @@ -31,6 +30,9 @@ import ( "sync" "time" + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "golang.org/x/sync/errgroup" "k8s.io/apimachinery/pkg/types" @@ -118,18 +120,18 @@ func (fdbCluster *FdbCluster) WaitUntilExists() { gomega.Eventually(func() error { return fdbCluster.getClient(). - Get(ctx.Background(), key, &clusterRequest) + Get(context.Background(), key, &clusterRequest) }).WithTimeout(2 * time.Minute).ShouldNot(gomega.HaveOccurred()) } // Create asynchronously creates this FDB cluster. func (fdbCluster *FdbCluster) Create() error { - return fdbCluster.getClient().Create(ctx.Background(), fdbCluster.cluster) + return fdbCluster.getClient().Create(context.Background(), fdbCluster.cluster) } // Update asynchronously updates this FDB cluster definition. func (fdbCluster *FdbCluster) Update() error { - return fdbCluster.getClient().Update(ctx.Background(), fdbCluster.cluster) + return fdbCluster.getClient().Update(context.Background(), fdbCluster.cluster) } // ReconciliationOptions defines the different reconciliation options. @@ -341,7 +343,7 @@ func (fdbCluster *FdbCluster) ForceReconcile() { // This will apply an Annotation to the object which will trigger the reconcile loop. // This should speed up the reconcile phase. err := fdbCluster.getClient().Patch( - ctx.Background(), + context.Background(), fdbCluster.cluster, patch) if err != nil { @@ -429,7 +431,7 @@ func (fdbCluster *FdbCluster) UpdateClusterStatusWithStatus(desiredStatus *fdbv1 // Try a few times before giving up. gomega.Eventually(func(g gomega.Gomega) bool { err := fdbCluster.getClient(). - Get(ctx.Background(), client.ObjectKeyFromObject(fdbCluster.cluster), fetchedCluster) + Get(context.Background(), client.ObjectKeyFromObject(fdbCluster.cluster), fetchedCluster) g.Expect(err).NotTo(gomega.HaveOccurred(), "error fetching cluster") updated := equality.Semantic.DeepEqual(fetchedCluster.Status, *desiredStatus) @@ -439,7 +441,7 @@ func (fdbCluster *FdbCluster) UpdateClusterStatusWithStatus(desiredStatus *fdbv1 } desiredStatus.DeepCopyInto(&fetchedCluster.Status) - err = fdbCluster.getClient().Status().Update(ctx.Background(), fetchedCluster) + err = fdbCluster.getClient().Status().Update(context.Background(), fetchedCluster) g.Expect(err).NotTo(gomega.HaveOccurred(), "error updating cluster status") // Retry here and let the method fetch the latest version of the cluster again until the spec is updated. return false @@ -470,7 +472,7 @@ func (fdbCluster *FdbCluster) UpdateClusterSpecWithSpec(desiredSpec *fdbv1beta2. // Try a few times before giving up. gomega.Eventually(func(g gomega.Gomega) bool { err := fdbCluster.getClient(). - Get(ctx.Background(), client.ObjectKeyFromObject(fdbCluster.cluster), fetchedCluster) + Get(context.Background(), client.ObjectKeyFromObject(fdbCluster.cluster), fetchedCluster) g.Expect(err).NotTo(gomega.HaveOccurred(), "error fetching cluster") specUpdated := equality.Semantic.DeepEqual(fetchedCluster.Spec, *desiredSpec) @@ -480,7 +482,7 @@ func (fdbCluster *FdbCluster) UpdateClusterSpecWithSpec(desiredSpec *fdbv1beta2. } desiredSpec.DeepCopyInto(&fetchedCluster.Spec) - err = fdbCluster.getClient().Update(ctx.Background(), fetchedCluster) + err = fdbCluster.getClient().Update(context.Background(), fetchedCluster) g.Expect(err).NotTo(gomega.HaveOccurred(), "error updating cluster spec") // Retry here and let the method fetch the latest version of the cluster again until the spec is updated. return false @@ -495,7 +497,7 @@ func (fdbCluster *FdbCluster) GetAllPods() *corev1.PodList { gomega.Eventually(func() error { return fdbCluster.getClient(). - List(ctx.TODO(), podList, client.MatchingLabels(fdbCluster.cluster.GetMatchLabels())) + List(context.Background(), podList, client.MatchingLabels(fdbCluster.cluster.GetMatchLabels())) }).WithTimeout(1 * time.Minute).WithPolling(1 * time.Second).ShouldNot(gomega.HaveOccurred()) return podList @@ -506,7 +508,7 @@ func (fdbCluster *FdbCluster) GetPods() *corev1.PodList { podList := &corev1.PodList{} gomega.Eventually(func() error { - return fdbCluster.getClient().List(ctx.TODO(), podList, + return fdbCluster.getClient().List(context.Background(), podList, client.InNamespace(fdbCluster.Namespace()), client.MatchingLabels(fdbCluster.cluster.GetMatchLabels()), client.MatchingFields(map[string]string{"status.phase": string(corev1.PodRunning)}), @@ -534,7 +536,7 @@ func (fdbCluster *FdbCluster) getPodsByProcessClass( podList := &corev1.PodList{} gomega.Eventually(func() error { - return fdbCluster.getClient().List(ctx.TODO(), podList, + return fdbCluster.getClient().List(context.Background(), podList, client.InNamespace(fdbCluster.Namespace()), client.MatchingLabels(map[string]string{ fdbv1beta2.FDBClusterLabel: fdbCluster.cluster.Name, @@ -570,7 +572,7 @@ func (fdbCluster *FdbCluster) GetPod(name string) *corev1.Pod { // Retry if for some reasons an error is returned gomega.Eventually(func() error { return fdbCluster.getClient(). - Get(ctx.TODO(), client.ObjectKey{Name: name, Namespace: fdbCluster.Namespace()}, pod) + Get(context.Background(), client.ObjectKey{Name: name, Namespace: fdbCluster.Namespace()}, pod) }).WithTimeout(2 * time.Minute).WithPolling(1 * time.Second).ShouldNot(gomega.HaveOccurred()) return pod @@ -601,7 +603,7 @@ func (fdbCluster *FdbCluster) GetVolumeClaimsForProcesses( volumeClaimList := &corev1.PersistentVolumeClaimList{} gomega.Expect( fdbCluster.getClient(). - List(ctx.TODO(), volumeClaimList, + List(context.Background(), volumeClaimList, client.InNamespace(fdbCluster.Namespace()), client.MatchingLabels(map[string]string{ fdbv1beta2.FDBClusterLabel: fdbCluster.cluster.Name, @@ -780,7 +782,7 @@ func (fdbCluster *FdbCluster) SetPodAsUnschedulable(pod corev1.Pod) error { fetchedPod := &corev1.Pod{} return wait.PollImmediate(2*time.Second, 5*time.Minute, func() (bool, error) { err := fdbCluster.getClient(). - Get(ctx.Background(), client.ObjectKeyFromObject(&pod), fetchedPod) + Get(context.Background(), client.ObjectKeyFromObject(&pod), fetchedPod) if err != nil { if kubeErrors.IsNotFound(err) { return false, nil @@ -790,7 +792,7 @@ func (fdbCluster *FdbCluster) SetPodAsUnschedulable(pod corev1.Pod) error { // Try deleting the Pod as a workaround until the operator handle all cases. if fetchedPod.Spec.NodeName != "" && fetchedPod.DeletionTimestamp.IsZero() { - _ = fdbCluster.getClient().Delete(ctx.Background(), &pod) + _ = fdbCluster.getClient().Delete(context.Background(), &pod) } return fetchedPod.Spec.NodeName == "", nil @@ -849,7 +851,7 @@ func (fdbCluster *FdbCluster) GetServices() *corev1.ServiceList { serviceList := &corev1.ServiceList{} gomega.Expect( fdbCluster.getClient().List( - ctx.TODO(), + context.Background(), serviceList, client.InNamespace(fdbCluster.Namespace()), client.MatchingLabels(fdbCluster.GetResourceLabels())), @@ -920,7 +922,7 @@ func (fdbCluster *FdbCluster) WaitForPodRemoval(pod *corev1.Pod) { fetchedPod := &corev1.Pod{} gomega.Eventually(func() bool { err := fdbCluster.getClient(). - Get(ctx.Background(), client.ObjectKeyFromObject(pod), fetchedPod) + Get(context.Background(), client.ObjectKeyFromObject(pod), fetchedPod) if err != nil && kubeErrors.IsNotFound(err) { return true } @@ -946,7 +948,7 @@ func (fdbCluster *FdbCluster) WaitForPodRemoval(pod *corev1.Pod) { // This will apply an Annotation to the object which will trigger the reconcile loop. // This should speed up the reconcile phase. _ = fdbCluster.getClient().Patch( - ctx.Background(), + context.Background(), resCluster, patch) counter = -1 @@ -996,7 +998,7 @@ func (fdbCluster *FdbCluster) SetFinalizerForPvc( ) error { patch := client.MergeFrom(pvc.DeepCopy()) pvc.SetFinalizers(finalizers) - return fdbCluster.getClient().Patch(ctx.Background(), &pvc, patch) + return fdbCluster.getClient().Patch(context.Background(), &pvc, patch) } // UpdateStorageClass this will set the StorageClass for the provided process class of the current FoundationDBCluster. @@ -1008,7 +1010,7 @@ func (fdbCluster *FdbCluster) UpdateStorageClass( resCluster := fdbCluster.GetCluster() patch := client.MergeFrom(resCluster.DeepCopy()) resCluster.Spec.Processes[processClass].VolumeClaimTemplate.Spec.StorageClassName = &storageClass - _ = fdbCluster.getClient().Patch(ctx.Background(), resCluster, patch) + _ = fdbCluster.getClient().Patch(context.Background(), resCluster, patch) return fdbCluster.WaitForReconciliation() } @@ -1203,7 +1205,7 @@ func (fdbCluster *FdbCluster) GetPodTemplateSpec( func (fdbCluster *FdbCluster) CheckPodIsDeleted(podName string) bool { pod := &corev1.Pod{} err := fdbCluster.getClient(). - Get(ctx.TODO(), client.ObjectKey{Namespace: fdbCluster.Namespace(), Name: podName}, pod) + Get(context.Background(), client.ObjectKey{Namespace: fdbCluster.Namespace(), Name: podName}, pod) if err != nil { if kubeErrors.IsNotFound(err) { @@ -1238,7 +1240,7 @@ func (fdbCluster *FdbCluster) SetUseDNSInClusterFile(useDNSInClusterFile bool) e // Destroy will remove the underlying cluster. func (fdbCluster *FdbCluster) Destroy() error { return fdbCluster.getClient(). - Delete(ctx.Background(), fdbCluster.cluster) + Delete(context.Background(), fdbCluster.cluster) } // SetIgnoreMissingProcessesSeconds sets the IgnoreMissingProcessesSeconds setting. @@ -1310,7 +1312,7 @@ func (fdbCluster *FdbCluster) UpdateContainerImage(pod *corev1.Pod, containerNam pod.Spec.Containers[idx].Image = image } - gomega.Expect(fdbCluster.factory.GetControllerRuntimeClient().Update(ctx.Background(), pod)).NotTo(gomega.HaveOccurred()) + gomega.Expect(fdbCluster.factory.GetControllerRuntimeClient().Update(context.Background(), pod)).NotTo(gomega.HaveOccurred()) } // SetBuggifyBlockRemoval will set the provided list of process group IDs to be blocked for removal. @@ -1376,7 +1378,7 @@ func (fdbCluster *FdbCluster) UpdateAnnotationsAndLabels(annotations map[string] gomega.Expect(retry.RetryOnConflict(retry.DefaultRetry, func() error { fetchedCluster := &fdbv1beta2.FoundationDBCluster{} err := fdbCluster.getClient(). - Get(ctx.Background(), client.ObjectKeyFromObject(fdbCluster.cluster), fetchedCluster) + Get(context.Background(), client.ObjectKeyFromObject(fdbCluster.cluster), fetchedCluster) if err != nil { return err } @@ -1386,7 +1388,7 @@ func (fdbCluster *FdbCluster) UpdateAnnotationsAndLabels(annotations map[string] fetchedCluster.Labels = labels return fdbCluster.getClient().Patch( - ctx.Background(), + context.Background(), fetchedCluster, patch) })).NotTo(gomega.HaveOccurred()) @@ -1488,8 +1490,241 @@ func (fdbCluster *FdbCluster) UpdateConnectionString(connectionString string) { fdbCluster.UpdateClusterSpec() cm := &corev1.ConfigMap{} - gomega.Expect(fdbCluster.factory.controllerRuntimeClient.Get(ctx.Background(), client.ObjectKey{Namespace: fdbCluster.Namespace(), Name: fdbCluster.Name() + "-config"}, cm)).NotTo(gomega.HaveOccurred()) + gomega.Expect(fdbCluster.factory.controllerRuntimeClient.Get(context.Background(), client.ObjectKey{Namespace: fdbCluster.Namespace(), Name: fdbCluster.Name() + "-config"}, cm)).NotTo(gomega.HaveOccurred()) gomega.Expect(cm.Data).To(gomega.HaveKey(fdbv1beta2.ClusterFileKey)) cm.Data[fdbv1beta2.ClusterFileKey] = connectionString - gomega.Expect(fdbCluster.factory.controllerRuntimeClient.Update(ctx.Background(), cm)).NotTo(gomega.HaveOccurred()) + gomega.Expect(fdbCluster.factory.controllerRuntimeClient.Update(context.Background(), cm)).NotTo(gomega.HaveOccurred()) +} + +// CreateTesterDeployment will create a deployment that runs tester processes with the specified number of replicas. +func (fdbCluster *FdbCluster) CreateTesterDeployment(replicas int) *appsv1.Deployment { + deploymentName := fdbCluster.Name() + "-tester" + + deploymentLabels := map[string]string{ + "app": deploymentName, + fdbv1beta2.FDBProcessClassLabel: string(fdbv1beta2.ProcessClassTest), + } + + mainImage := fdbv1beta2.SelectImageConfig(fdbCluster.factory.GetMainContainerOverrides(false, fdbCluster.cluster.UseUnifiedImage()).ImageConfigs, fdbCluster.cluster.Spec.Version).Image() + + var initArgs []string + var sidecarImage string + if fdbCluster.cluster.UseUnifiedImage() { + sidecarImage = mainImage + initArgs = []string{"--mode", "init", "--input-dir", "/var/input-files", "--output-dir", "/var/output-files", "--require-not-empty", "fdb.cluster", "--copy-file", "fdb.cluster"} + } else { + sidecarImage = fdbv1beta2.SelectImageConfig(fdbCluster.factory.GetSidecarContainerOverrides(fdbCluster.cluster.UseUnifiedImage()).ImageConfigs, fdbCluster.cluster.Spec.Version).Image() + initArgs = []string{"--init-mode", "--require-not-empty", "fdb.cluster", "--copy-file", "fdb.cluster"} + } + + deploy := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: deploymentName, + Namespace: fdbCluster.Namespace(), + Labels: deploymentLabels, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: pointer.Int32(int32(replicas)), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": deploymentName, + }, + }, + Strategy: appsv1.DeploymentStrategy{ + Type: appsv1.RecreateDeploymentStrategyType, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: deploymentLabels, + }, + Spec: corev1.PodSpec{ + ServiceAccountName: foundationdbServiceAccount, + SecurityContext: &corev1.PodSecurityContext{ + FSGroup: pointer.Int64(4059), + }, + InitContainers: []corev1.Container{ + { + Name: fdbv1beta2.InitContainerName, + ImagePullPolicy: fdbCluster.factory.getImagePullPolicy(), + Image: sidecarImage, + SecurityContext: &corev1.SecurityContext{ + Privileged: pointer.Bool(true), + AllowPrivilegeEscalation: pointer.Bool(true), // for performance profiling + ReadOnlyRootFilesystem: pointer.Bool( + false, + ), // to allow I/O chaos to succeed + }, + Args: initArgs, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "dynamic-conf", + MountPath: "/var/output-files", + }, + { + Name: "config-map", + MountPath: "/var/input-files", + }, + }, + }, + }, + Containers: []corev1.Container{ + { + Name: fdbv1beta2.MainContainerName, + ImagePullPolicy: fdbCluster.factory.getImagePullPolicy(), + Image: mainImage, + SecurityContext: &corev1.SecurityContext{ + Privileged: pointer.Bool(true), + AllowPrivilegeEscalation: pointer.Bool(true), // for performance profiling + ReadOnlyRootFilesystem: pointer.Bool( + false, + ), // to allow I/O chaos to succeed + }, + Command: []string{ + "/usr/bin/fdbserver", + }, + Args: []string{ + "--class", + string(fdbv1beta2.ProcessClassTest), + "--public_address", + "[$(" + fdbv1beta2.EnvNamePublicIP + ")]:4500", + "--datadir", + "/var/fdb/data", + "--logdir", + "/var/log/fdb-trace-logs", + "--loggroup", fdbCluster.cluster.GetLogGroup(), + "--locality_zoneid", + "$(" + fdbv1beta2.EnvNameMachineID + ")", + "--locality_dcid", + fdbCluster.cluster.Spec.DataCenter, + }, + Env: []corev1.EnvVar{ + { + Name: fdbv1beta2.EnvNameTLSCert, + Value: "/tmp/fdb-certs/tls.crt", + }, + { + Name: fdbv1beta2.EnvNameTLSCaFile, + Value: "/tmp/fdb-certs/ca.pem", + }, + { + Name: fdbv1beta2.EnvNameTLSKeyFile, + Value: "/tmp/fdb-certs/tls.key", + }, + { + Name: fdbv1beta2.EnvNameTLSVerifyPeers, + Value: "I.CN=localhost,I.O=Example Inc.,S.CN=localhost,S.O=Example Inc.", + }, + { + Name: fdbv1beta2.EnvNameFDBTraceLogDirPath, + Value: "/var/log/fdb-trace-logs", + }, + + { + Name: fdbv1beta2.EnvNameClusterFile, + Value: "/var/dynamic-conf/fdb.cluster", + }, + { + Name: fdbv1beta2.EnvNamePublicIP, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "status.podIP", + }, + }, + }, + { + Name: fdbv1beta2.EnvNameMachineID, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "spec.nodeName", + }, + }, + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "fdb-certs", + ReadOnly: true, + MountPath: "/tmp/fdb-certs", + }, + { + Name: "dynamic-conf", + MountPath: "/var/dynamic-conf", + }, + { + Name: "data", + MountPath: "/var/fdb/data", + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "config-map", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: fdbCluster.Name() + "-config", + }, + Items: []corev1.KeyToPath{ + { + Key: fdbv1beta2.ClusterFileKey, + Path: "fdb.cluster", + }, + }, + }, + }, + }, + { + Name: "data", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + { + Name: "logs", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + { + Name: "dynamic-conf", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + { + Name: "fdb-certs", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: fdbCluster.factory.GetSecretName(), + }, + }, + }, + }, + }, + }, + }, + } + + gomega.Expect(fdbCluster.factory.controllerRuntimeClient.Create(context.Background(), deploy)).NotTo(gomega.HaveOccurred()) + gomega.Eventually(func(g gomega.Gomega) int { + pods := &corev1.PodList{} + + err := fdbCluster.factory.controllerRuntimeClient.List(context.Background(), pods, + client.InNamespace(fdbCluster.Namespace()), + client.MatchingLabels(map[string]string{"app": deploymentName})) + g.Expect(err).NotTo(gomega.HaveOccurred()) + + var runningReplicas int + for _, pod := range pods.Items { + if pod.Status.Phase == corev1.PodRunning && pod.DeletionTimestamp.IsZero() { + runningReplicas++ + continue + } + } + + return runningReplicas + }).WithTimeout(10 * time.Minute).WithPolling(2 * time.Second).Should(gomega.BeNumerically(">=", replicas)) + + return deploy } diff --git a/e2e/test_operator_ha_upgrades/operator_ha_upgrade_test.go b/e2e/test_operator_ha_upgrades/operator_ha_upgrade_test.go index 019e6cda..2a1ce2ba 100644 --- a/e2e/test_operator_ha_upgrades/operator_ha_upgrade_test.go +++ b/e2e/test_operator_ha_upgrades/operator_ha_upgrade_test.go @@ -628,4 +628,39 @@ var _ = Describe("Operator HA Upgrades", Label("e2e", "pr"), func() { EntryDescription("Upgrade from %[1]s to %[2]s"), fixtures.GenerateUpgradeTableEntries(testOptions), ) + + DescribeTable("when tester processes are running in the primary and remote dc", + func(beforeVersion string, targetVersion string) { + clusterConfig := fixtures.DefaultClusterConfigWithHaMode(fixtures.HaFourZoneSingleSat, false) + + clusterSetupWithTestConfig( + testConfig{ + beforeVersion: beforeVersion, + enableOperatorPodChaos: false, + enableHealthCheck: false, + loadData: false, + clusterConfig: clusterConfig, + }, + ) + + // Start tester processes in the primary side + primaryTester := fdbCluster.GetPrimary().CreateTesterDeployment(4) + // Start tester processes in the remote side + remoteTester := fdbCluster.GetRemote().CreateTesterDeployment(4) + + // Start the upgrade with the tester processes present. + Expect(fdbCluster.UpgradeCluster(targetVersion, false)).NotTo(HaveOccurred()) + // Verify that the upgrade proceeds + fdbCluster.VerifyVersion(targetVersion) + // Wait here for the primary satellite to reconcile, this means all Pods have been replaced + Expect(fdbCluster.GetPrimarySatellite().WaitForReconciliation()).NotTo(HaveOccurred()) + // Make sure the cluster has no data loss + fdbCluster.GetPrimary().EnsureTeamTrackersHaveMinReplicas() + + factory.Delete(primaryTester) + factory.Delete(remoteTester) + }, + EntryDescription("Upgrade from %[1]s to %[2]s"), + fixtures.GenerateUpgradeTableEntries(testOptions), + ) }) diff --git a/go.mod b/go.mod index ccb843b4..0464a444 100644 --- a/go.mod +++ b/go.mod @@ -30,10 +30,7 @@ require ( sigs.k8s.io/yaml v1.3.0 ) -require ( - golang.org/x/net v0.23.0 - golang.org/x/sync v0.3.0 -) +require golang.org/x/sync v0.3.0 require ( github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect @@ -93,6 +90,7 @@ require ( go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 // indirect go.uber.org/multierr v1.10.0 // indirect go.uber.org/zap v1.25.0 // indirect + golang.org/x/net v0.23.0 // indirect golang.org/x/oauth2 v0.5.0 // indirect golang.org/x/sys v0.20.0 // indirect golang.org/x/term v0.18.0 // indirect