Skip to content

Commit

Permalink
Fix exclusion logic multiple processes (#1721)
Browse files Browse the repository at this point in the history
* Make sure we verify the exclusion of multiple processes correct
  • Loading branch information
johscheuer authored Jul 4, 2023
1 parent 9325541 commit 70675fd
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 22 deletions.
72 changes: 68 additions & 4 deletions e2e/test_operator_upgrades/operator_upgrades_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,10 @@ var _ = AfterSuite(func() {
}
})

func clusterSetup(beforeVersion string, availabilityCheck bool) {
func clusterSetupWithConfig(beforeVersion string, availabilityCheck bool, config *fixtures.ClusterConfig) {
factory.SetBeforeVersion(beforeVersion)
fdbCluster = factory.CreateFdbCluster(
&fixtures.ClusterConfig{
DebugSymbols: false,
},
config,
factory.GetClusterOptions(fixtures.UseVersionBeforeUpgrade)...,
)

Expand All @@ -79,6 +77,12 @@ func clusterSetup(beforeVersion string, availabilityCheck bool) {
).ShouldNot(HaveOccurred())
}

func clusterSetup(beforeVersion string, availabilityCheck bool) {
clusterSetupWithConfig(beforeVersion, availabilityCheck, &fixtures.ClusterConfig{
DebugSymbols: false,
})
}

// Checks if cluster is running at the expectedVersion. This is done by checking the status of the FoundationDBCluster status.
// Before that we checked the cluster status json by checking the reported version of all processes. This approach only worked for
// version compatible upgrades, since incompatible processes won't be part of the cluster anyway. To simplify the check
Expand Down Expand Up @@ -1071,4 +1075,64 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {
fixtures.GenerateUpgradeTableEntries(testOptions),
)

DescribeTable(
"with 2 storage servers per Pod",
func(beforeVersion string, targetVersion string) {
clusterSetupWithConfig(beforeVersion, true, &fixtures.ClusterConfig{
DebugSymbols: false,
StorageServerPerPod: 2,
})

Expect(fdbCluster.UpgradeCluster(targetVersion, false)).NotTo(HaveOccurred())
// Make sure the cluster is still running with 2 storage server per Pod.
Expect(fdbCluster.GetCluster().Spec.StorageServersPerPod).To(Equal(2))

if !fixtures.VersionsAreProtocolCompatible(beforeVersion, targetVersion) {
// Ensure that the operator is setting the IncorrectConfigMap and IncorrectCommandLine conditions during the upgrade
// process.
expectedConditions := map[fdbv1beta2.ProcessGroupConditionType]bool{
fdbv1beta2.IncorrectConfigMap: true,
fdbv1beta2.IncorrectCommandLine: true,
}
Eventually(func() bool {
cluster := fdbCluster.GetCluster()

for _, processGroup := range cluster.Status.ProcessGroups {
if !processGroup.MatchesConditions(expectedConditions) {
return false
}
}

return true
}).WithTimeout(10 * time.Minute).WithPolling(5 * time.Second).Should(BeTrue())
}

transactionSystemProcessGroups := make(map[fdbv1beta2.ProcessGroupID]fdbv1beta2.None)
// Wait until the cluster is upgraded and fully reconciled.
Expect(fdbCluster.WaitUntilWithForceReconcile(2, 600, func(cluster *fdbv1beta2.FoundationDBCluster) bool {
for _, processGroup := range cluster.Status.ProcessGroups {
if processGroup.ProcessClass == fdbv1beta2.ProcessClassStorage {
continue
}

transactionSystemProcessGroups[processGroup.ProcessGroupID] = fdbv1beta2.None{}
}

// Allow soft reconciliation and make sure the running version was updated
return cluster.Status.Generations.Reconciled == cluster.Generation && cluster.Status.RunningVersion == targetVersion
})).NotTo(HaveOccurred())

// Get the desired process counts based on the current cluster configuration
processCounts, err := fdbCluster.GetProcessCounts()
Expect(err).NotTo(HaveOccurred())

// During an upgrade we expect that the transaction system processes are replaced, so we expect to have seen
// 2 times the process counts for transaction system processes. Add a small buffer of 5 to allow automatic
// replacements during an upgrade.
expectedProcessCounts := (processCounts.Total()-processCounts.Storage)*2 + 5
Expect(len(transactionSystemProcessGroups)).To(BeNumerically("<=", expectedProcessCounts))
},
EntryDescription("Upgrade from %[1]s to %[2]s"),
fixtures.GenerateUpgradeTableEntries(testOptions),
)
})
39 changes: 24 additions & 15 deletions pkg/fdbstatus/status_checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ type exclusionStatus struct {
// getRemainingAndExcludedFromStatus checks which processes of the input address list are excluded in the cluster and which are not.
func getRemainingAndExcludedFromStatus(logger logr.Logger, status *fdbv1beta2.FoundationDBStatus, addresses []fdbv1beta2.ProcessAddress) exclusionStatus {
notExcludedAddresses := map[string]fdbv1beta2.None{}
fullyExcludedAddresses := map[string]fdbv1beta2.None{}
visitedAddresses := map[string]fdbv1beta2.None{}
fullyExcludedAddresses := map[string]int{}
visitedAddresses := map[string]int{}

// If there are more than 1 active generations we can not handout any information about excluded processes based on
// the cluster status information as only the latest log processes will have the log process role. If we don't check
Expand All @@ -67,54 +67,63 @@ func getRemainingAndExcludedFromStatus(logger logr.Logger, status *fdbv1beta2.Fo
}
}

addressesToVerify := map[string]fdbv1beta2.None{}
for _, addr := range addresses {
notExcludedAddresses[addr.MachineAddress()] = fdbv1beta2.None{}
addressesToVerify[addr.MachineAddress()] = fdbv1beta2.None{}
}

// Check in the status output which processes are already marked for exclusion in the cluster
for _, process := range status.Cluster.Processes {
if _, ok := notExcludedAddresses[process.Address.MachineAddress()]; !ok {
if _, ok := addressesToVerify[process.Address.MachineAddress()]; !ok {
continue
}

visitedAddresses[process.Address.MachineAddress()] = fdbv1beta2.None{}
visitedAddresses[process.Address.MachineAddress()]++
if !process.Excluded {
notExcludedAddresses[process.Address.MachineAddress()] = fdbv1beta2.None{}
continue
}

if len(process.Roles) == 0 {
fullyExcludedAddresses[process.Address.MachineAddress()] = fdbv1beta2.None{}
fullyExcludedAddresses[process.Address.MachineAddress()]++
}

delete(notExcludedAddresses, process.Address.MachineAddress())
}

exclusions := exclusionStatus{
inProgress: make([]fdbv1beta2.ProcessAddress, 0, len(addresses)-len(notExcludedAddresses)-len(fullyExcludedAddresses)),
inProgress: make([]fdbv1beta2.ProcessAddress, 0, len(addresses)),
fullyExcluded: make([]fdbv1beta2.ProcessAddress, 0, len(fullyExcludedAddresses)),
notExcluded: make([]fdbv1beta2.ProcessAddress, 0, len(notExcludedAddresses)),
missingInStatus: make([]fdbv1beta2.ProcessAddress, 0, len(notExcludedAddresses)),
missingInStatus: make([]fdbv1beta2.ProcessAddress, 0, len(addresses)-len(visitedAddresses)),
}

for _, addr := range addresses {
machine := addr.MachineAddress()
// If we didn't visit that address (absent in the cluster status) we assume it's safe to run the exclude command against it.
// We have to run the exclude command against those addresses, to make sure they are not serving any roles.
if _, ok := visitedAddresses[addr.MachineAddress()]; !ok {
visitedCount, visited := visitedAddresses[machine]
if !visited {
exclusions.missingInStatus = append(exclusions.missingInStatus, addr)
continue
}

// Those addresses are not excluded, so it's not safe to start the exclude command to check if they are fully excluded.
if _, ok := notExcludedAddresses[addr.MachineAddress()]; ok {
if _, ok := notExcludedAddresses[machine]; ok {
exclusions.notExcluded = append(exclusions.notExcluded, addr)
continue
}

// Those are the processes that are marked as excluded and are not serving any roles. It's safe to delete Pods
// that host those processes.
if _, ok := fullyExcludedAddresses[addr.MachineAddress()]; ok {
exclusions.fullyExcluded = append(exclusions.fullyExcluded, addr)
continue
excludedCount, ok := fullyExcludedAddresses[addr.MachineAddress()]
if ok {
// We have to make sure that we have visited as many processes as we have seen fully excluded. Otherwise we might
// return a wrong signal if more than one process is used per Pod. In this case we have to wait for all processes
// to be fully excluded.
if visitedCount == excludedCount {
exclusions.fullyExcluded = append(exclusions.fullyExcluded, addr)
continue
}
logger.Info("found excluded addresses for machine, but not all processes are fully excluded", "visitedCount", visitedCount, "excludedCount", excludedCount, "machine", machine)
}

// Those are the processes that are marked as excluded but still serve at least one role.
Expand Down
121 changes: 118 additions & 3 deletions pkg/fdbstatus/status_checks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,16 @@ var _ = Describe("status_checks", func() {
}

DescribeTable("fetching the excluded and remaining processes from the status",
func(status *fdbv1beta2.FoundationDBStatus, addresses []fdbv1beta2.ProcessAddress, expectedExcluded []fdbv1beta2.ProcessAddress, expectedRemaining []fdbv1beta2.ProcessAddress, expectedFullyExcluded []fdbv1beta2.ProcessAddress, expectedMissing []fdbv1beta2.ProcessAddress) {
func(status *fdbv1beta2.FoundationDBStatus,
addresses []fdbv1beta2.ProcessAddress,
expectedInProgress []fdbv1beta2.ProcessAddress,
expectedNotExcluded []fdbv1beta2.ProcessAddress,
expectedFullyExcluded []fdbv1beta2.ProcessAddress,
expectedMissing []fdbv1beta2.ProcessAddress) {

exclusions := getRemainingAndExcludedFromStatus(logr.Discard(), status, addresses)
Expect(expectedExcluded).To(ConsistOf(exclusions.inProgress))
Expect(expectedRemaining).To(ConsistOf(exclusions.notExcluded))
Expect(expectedInProgress).To(ConsistOf(exclusions.inProgress))
Expect(expectedNotExcluded).To(ConsistOf(exclusions.notExcluded))
Expect(expectedFullyExcluded).To(ConsistOf(exclusions.fullyExcluded))
Expect(expectedMissing).To(ConsistOf(exclusions.missingInStatus))
},
Expand Down Expand Up @@ -146,6 +152,115 @@ var _ = Describe("status_checks", func() {
nil,
nil,
),
Entry("when the process group has multiple processes and only one is fully excluded",
&fdbv1beta2.FoundationDBStatus{
Cluster: fdbv1beta2.FoundationDBStatusClusterInfo{
Processes: map[fdbv1beta2.ProcessGroupID]fdbv1beta2.FoundationDBStatusProcessInfo{
"1": {
Address: addr1,
Excluded: true,
},
"2": {
Address: addr2,
},
"3": {
Address: addr3,
},
"4-1": {
Address: addr4,
Excluded: true,
Locality: map[string]string{
fdbv1beta2.FDBLocalityProcessIDKey: "4-1",
},
},
"4-2": {
Address: addr4,
Excluded: true,
Roles: []fdbv1beta2.FoundationDBStatusProcessRoleInfo{
{
Role: string(fdbv1beta2.ProcessRoleStorage),
},
},
Locality: map[string]string{
fdbv1beta2.FDBLocalityProcessIDKey: "4-2",
},
},
},
},
},
[]fdbv1beta2.ProcessAddress{addr4},
[]fdbv1beta2.ProcessAddress{addr4},
nil,
nil,
nil,
),
Entry("when the process group has multiple processes and both are fully excluded",
&fdbv1beta2.FoundationDBStatus{
Cluster: fdbv1beta2.FoundationDBStatusClusterInfo{
Processes: map[fdbv1beta2.ProcessGroupID]fdbv1beta2.FoundationDBStatusProcessInfo{
"1": {
Address: addr1,
Excluded: true,
},
"2": {
Address: addr2,
},
"3": {
Address: addr3,
},
"4-1": {
Address: addr4,
Excluded: true,
},
"4-2": {
Address: addr4,
Excluded: true,
},
},
},
},
[]fdbv1beta2.ProcessAddress{addr4},
nil,
nil,
[]fdbv1beta2.ProcessAddress{addr4},
nil,
),
Entry("when the process group has multiple processes and only one is excluded",
&fdbv1beta2.FoundationDBStatus{
Cluster: fdbv1beta2.FoundationDBStatusClusterInfo{
Processes: map[fdbv1beta2.ProcessGroupID]fdbv1beta2.FoundationDBStatusProcessInfo{
"1": {
Address: addr1,
Excluded: true,
},
"2": {
Address: addr2,
},
"3": {
Address: addr3,
},
"4-1": {
Address: addr4,
Excluded: true,
},
"4-2": {
Address: addr4,
Excluded: false,
Roles: []fdbv1beta2.FoundationDBStatusProcessRoleInfo{
{
Role: string(fdbv1beta2.ProcessRoleStorage),
},
},
},
},
},
},
[]fdbv1beta2.ProcessAddress{addr4},
nil,
[]fdbv1beta2.ProcessAddress{addr4},
nil,
nil,
),
)
})

Expand Down

0 comments on commit 70675fd

Please sign in to comment.