Skip to content

Commit

Permalink
Merge branch 'main' into als-metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
zirain committed Jul 5, 2024
2 parents c3901a1 + abb7c96 commit 31595c8
Show file tree
Hide file tree
Showing 20 changed files with 563 additions and 150 deletions.
2 changes: 2 additions & 0 deletions charts/gateway-addons-helm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ To uninstall the chart:
| opentelemetry-collector.config.processors.attributes.actions[0].value | string | `"k8s.pod.name, k8s.namespace.name"` | |
| opentelemetry-collector.config.receivers.otlp.protocols.grpc.endpoint | string | `"${env:MY_POD_IP}:4317"` | |
| opentelemetry-collector.config.receivers.otlp.protocols.http.endpoint | string | `"${env:MY_POD_IP}:4318"` | |
| opentelemetry-collector.config.receivers.zipkin.endpoint | string | `"${env:MY_POD_IP}:9411"` | |
| opentelemetry-collector.config.service.extensions[0] | string | `"health_check"` | |
| opentelemetry-collector.config.service.pipelines.logs.exporters[0] | string | `"loki"` | |
| opentelemetry-collector.config.service.pipelines.logs.processors[0] | string | `"attributes"` | |
Expand All @@ -120,6 +121,7 @@ To uninstall the chart:
| opentelemetry-collector.config.service.pipelines.metrics.receivers[0] | string | `"otlp"` | |
| opentelemetry-collector.config.service.pipelines.traces.exporters[0] | string | `"otlp"` | |
| opentelemetry-collector.config.service.pipelines.traces.receivers[0] | string | `"otlp"` | |
| opentelemetry-collector.config.service.pipelines.traces.receivers[1] | string | `"zipkin"` | |
| opentelemetry-collector.enabled | bool | `false` | |
| opentelemetry-collector.fullnameOverride | string | `"otel-collector"` | |
| opentelemetry-collector.mode | string | `"deployment"` | |
Expand Down
3 changes: 3 additions & 0 deletions charts/gateway-addons-helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ opentelemetry-collector:
# Loki will convert this to k8s_pod_name label.
value: k8s.pod.name, k8s.namespace.name
receivers:
zipkin:
endpoint: ${env:MY_POD_IP}:9411
otlp:
protocols:
grpc:
Expand Down Expand Up @@ -230,3 +232,4 @@ opentelemetry-collector:
- otlp
receivers:
- otlp
- zipkin
46 changes: 33 additions & 13 deletions internal/gatewayapi/status/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ func UpdateGatewayStatusProgrammedCondition(gw *gwapiv1.Gateway, svc *corev1.Ser
} else {
gw.Status.Addresses = nil
}

// Update the programmed condition.
gw.Status.Conditions = MergeConditions(gw.Status.Conditions, computeGatewayProgrammedCondition(gw, deployment))
updateGatewayProgrammedCondition(gw, deployment)
}

func SetGatewayListenerStatusCondition(gateway *gwapiv1.Gateway, listenerStatusIdx int,
Expand Down Expand Up @@ -128,26 +129,45 @@ func computeGatewayAcceptedCondition(gw *gwapiv1.Gateway, accepted bool) metav1.
}
}

// computeGatewayProgrammedCondition computes the Gateway Programmed status condition.
const (
messageAddressNotAssigned = "No addresses have been assigned to the Gateway"
messageFmtTooManyAddresses = "Too many addresses (%d) have been assigned to the Gateway, the maximum number of addresses is 16"
messageNoResources = "Deployment replicas unavailable"
messageFmtProgrammed = "Address assigned to the Gateway, %d/%d envoy Deployment replicas available"
)

// updateGatewayProgrammedCondition computes the Gateway Programmed status condition.
// Programmed condition surfaces true when the Envoy Deployment status is ready.
func computeGatewayProgrammedCondition(gw *gwapiv1.Gateway, deployment *appsv1.Deployment) metav1.Condition {
func updateGatewayProgrammedCondition(gw *gwapiv1.Gateway, deployment *appsv1.Deployment) {
if len(gw.Status.Addresses) == 0 {
return newCondition(string(gwapiv1.GatewayConditionProgrammed), metav1.ConditionFalse,
string(gwapiv1.GatewayReasonAddressNotAssigned),
"No addresses have been assigned to the Gateway", time.Now(), gw.Generation)
gw.Status.Conditions = MergeConditions(gw.Status.Conditions,
newCondition(string(gwapiv1.GatewayConditionProgrammed), metav1.ConditionFalse, string(gwapiv1.GatewayReasonAddressNotAssigned),
messageAddressNotAssigned, time.Now(), gw.Generation))
return
}

if len(gw.Status.Addresses) > 16 {
gw.Status.Conditions = MergeConditions(gw.Status.Conditions,
newCondition(string(gwapiv1.GatewayConditionProgrammed), metav1.ConditionFalse, string(gwapiv1.GatewayReasonInvalid),
fmt.Sprintf(messageFmtTooManyAddresses, len(gw.Status.Addresses)), time.Now(), gw.Generation))

// Truncate the addresses to 16
// so that the status can be updated successfully.
gw.Status.Addresses = gw.Status.Addresses[:16]
return
}

// If there are no available replicas for the Envoy Deployment, don't
// mark the Gateway as ready yet.

if deployment == nil || deployment.Status.AvailableReplicas == 0 {
return newCondition(string(gwapiv1.GatewayConditionProgrammed), metav1.ConditionFalse,
string(gwapiv1.GatewayReasonNoResources),
"Deployment replicas unavailable", time.Now(), gw.Generation)
gw.Status.Conditions = MergeConditions(gw.Status.Conditions,
newCondition(string(gwapiv1.GatewayConditionProgrammed), metav1.ConditionFalse, string(gwapiv1.GatewayReasonNoResources),
messageNoResources, time.Now(), gw.Generation))
return
}

message := fmt.Sprintf("Address assigned to the Gateway, %d/%d envoy Deployment replicas available",
deployment.Status.AvailableReplicas, deployment.Status.Replicas)
return newCondition(string(gwapiv1.GatewayConditionProgrammed), metav1.ConditionTrue,
string(gwapiv1.GatewayConditionProgrammed), message, time.Now(), gw.Generation)
gw.Status.Conditions = MergeConditions(gw.Status.Conditions,
newCondition(string(gwapiv1.GatewayConditionProgrammed), metav1.ConditionTrue, string(gwapiv1.GatewayConditionProgrammed),
fmt.Sprintf(messageFmtProgrammed, deployment.Status.AvailableReplicas, deployment.Status.Replicas), time.Now(), gw.Generation))
}
211 changes: 147 additions & 64 deletions internal/gatewayapi/status/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
package status

import (
"fmt"
"reflect"
"strconv"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -17,24 +21,26 @@ import (
gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"
)

// TestUpdateGatewayStatusProgrammedCondition tests whether UpdateGatewayStatusProgrammedCondition correctly updates the addresses in the Gateway status.
func TestUpdateGatewayStatusProgrammedCondition(t *testing.T) {
type args struct {
gw *gwapiv1.Gateway
svc *corev1.Service
deployment *appsv1.Deployment
addresses []gwapiv1.GatewayStatusAddress
gw *gwapiv1.Gateway
svc *corev1.Service
deployment *appsv1.Deployment
nodeAddresses []string
}
tests := []struct {
name string
args args
name string
args args
wantAddresses []gwapiv1.GatewayStatusAddress
}{
{
name: "nil svc",
args: args{
gw: &gwapiv1.Gateway{},
svc: nil,
addresses: nil,
gw: &gwapiv1.Gateway{},
svc: nil,
},
wantAddresses: nil,
},
{
name: "LoadBalancer svc with ingress ip",
Expand All @@ -57,11 +63,11 @@ func TestUpdateGatewayStatusProgrammedCondition(t *testing.T) {
},
},
},
addresses: []gwapiv1.GatewayStatusAddress{
{
Type: ptr.To(gwapiv1.IPAddressType),
Value: "127.0.0.1",
},
},
wantAddresses: []gwapiv1.GatewayStatusAddress{
{
Type: ptr.To(gwapiv1.IPAddressType),
Value: "127.0.0.1",
},
},
},
Expand All @@ -86,15 +92,15 @@ func TestUpdateGatewayStatusProgrammedCondition(t *testing.T) {
},
},
},
addresses: []gwapiv1.GatewayStatusAddress{
{
Type: ptr.To(gwapiv1.IPAddressType),
Value: "127.0.0.1",
},
{
Type: ptr.To(gwapiv1.HostnameAddressType),
Value: "localhost",
},
},
wantAddresses: []gwapiv1.GatewayStatusAddress{
{
Type: ptr.To(gwapiv1.IPAddressType),
Value: "127.0.0.1",
},
{
Type: ptr.To(gwapiv1.HostnameAddressType),
Value: "localhost",
},
},
},
Expand All @@ -110,55 +116,135 @@ func TestUpdateGatewayStatusProgrammedCondition(t *testing.T) {
Type: corev1.ServiceTypeClusterIP,
},
},
addresses: []gwapiv1.GatewayStatusAddress{
{
Type: ptr.To(gwapiv1.IPAddressType),
Value: "127.0.0.1",
},
wantAddresses: []gwapiv1.GatewayStatusAddress{
{
Type: ptr.To(gwapiv1.IPAddressType),
Value: "127.0.0.1",
},
},
},
{
name: "Nodeport svc",
args: args{
gw: &gwapiv1.Gateway{},
nodeAddresses: []string{"1", "2"},
svc: &corev1.Service{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeNodePort,
},
},
},
wantAddresses: []gwapiv1.GatewayStatusAddress{
{
Type: ptr.To(gwapiv1.IPAddressType),
Value: "1",
},
{
Type: ptr.To(gwapiv1.IPAddressType),
Value: "2",
},
},
},
{
name: "Nodeport svc with too many node addresses",
args: args{
gw: &gwapiv1.Gateway{},
// 20 node addresses
nodeAddresses: func() (addr []string) {
for i := 0; i < 20; i++ {
addr = append(addr, strconv.Itoa(i))
}
return
}(),
svc: &corev1.Service{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeNodePort,
},
},
},
// Only the first 16 addresses should be set.
wantAddresses: func() (addr []gwapiv1.GatewayStatusAddress) {
for i := 0; i < 16; i++ {
addr = append(addr, gwapiv1.GatewayStatusAddress{
Type: ptr.To(gwapiv1.IPAddressType),
Value: strconv.Itoa(i),
})
}
return
}(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
UpdateGatewayStatusProgrammedCondition(tt.args.gw, tt.args.svc, tt.args.deployment)
assert.True(t, reflect.DeepEqual(tt.args.addresses, tt.args.gw.Status.Addresses))
UpdateGatewayStatusProgrammedCondition(tt.args.gw, tt.args.svc, tt.args.deployment, tt.args.nodeAddresses...)
assert.True(t, reflect.DeepEqual(tt.wantAddresses, tt.args.gw.Status.Addresses))
})
}
}

func TestGatewayReadyCondition(t *testing.T) {
func TestUpdateGatewayProgrammedCondition(t *testing.T) {
testCases := []struct {
name string
serviceAddress bool
deploymentStatus appsv1.DeploymentStatus
expect metav1.Condition
name string
// serviceAddressNum indicates how many addresses are set in the Gateway status.
serviceAddressNum int
deploymentStatus appsv1.DeploymentStatus
expectCondition []metav1.Condition
}{
{
name: "ready gateway",
serviceAddress: true,
deploymentStatus: appsv1.DeploymentStatus{AvailableReplicas: 1},
expect: metav1.Condition{
Status: metav1.ConditionTrue,
Reason: string(gwapiv1.GatewayConditionProgrammed),
name: "ready gateway",
serviceAddressNum: 1,
deploymentStatus: appsv1.DeploymentStatus{AvailableReplicas: 1, Replicas: 1},
expectCondition: []metav1.Condition{
{
Type: string(gwapiv1.GatewayConditionProgrammed),
Status: metav1.ConditionTrue,
Reason: string(gwapiv1.GatewayConditionProgrammed),
Message: fmt.Sprintf(messageFmtProgrammed, 1, 1),
},
},
},
{
name: "not ready gateway without address",
serviceAddress: false,
deploymentStatus: appsv1.DeploymentStatus{AvailableReplicas: 1},
expect: metav1.Condition{
Status: metav1.ConditionFalse,
Reason: string(gwapiv1.GatewayReasonAddressNotAssigned),
name: "not ready gateway without address",
serviceAddressNum: 0,
deploymentStatus: appsv1.DeploymentStatus{AvailableReplicas: 1},
expectCondition: []metav1.Condition{
{
Type: string(gwapiv1.GatewayConditionProgrammed),
Status: metav1.ConditionFalse,
Reason: string(gwapiv1.GatewayReasonAddressNotAssigned),
Message: messageAddressNotAssigned,
},
},
},
{
name: "not ready gateway with address unavailable pods",
serviceAddress: true,
deploymentStatus: appsv1.DeploymentStatus{AvailableReplicas: 0},
expect: metav1.Condition{
Status: metav1.ConditionFalse,
Reason: string(gwapiv1.GatewayReasonNoResources),
name: "not ready gateway with too many addresses",
serviceAddressNum: 17,
deploymentStatus: appsv1.DeploymentStatus{AvailableReplicas: 1},
expectCondition: []metav1.Condition{
{
Type: string(gwapiv1.GatewayConditionProgrammed),
Status: metav1.ConditionFalse,
Reason: string(gwapiv1.GatewayReasonInvalid),
Message: fmt.Sprintf(messageFmtTooManyAddresses, 17),
},
},
},
{
name: "not ready gateway with address unavailable pods",
serviceAddressNum: 1,
deploymentStatus: appsv1.DeploymentStatus{AvailableReplicas: 0},
expectCondition: []metav1.Condition{
{
Type: string(gwapiv1.GatewayConditionProgrammed),
Status: metav1.ConditionFalse,
Reason: string(gwapiv1.GatewayReasonNoResources),
Message: messageNoResources,
},
},
},
}
Expand All @@ -168,23 +254,20 @@ func TestGatewayReadyCondition(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
gtw := &gwapiv1.Gateway{}
if tc.serviceAddress {
gtw.Status = gwapiv1.GatewayStatus{
Addresses: []gwapiv1.GatewayStatusAddress{
{
Type: ptr.To(gwapiv1.IPAddressType),
Value: "1.1.1.1",
},
},
gtw.Status.Addresses = make([]gwapiv1.GatewayStatusAddress, tc.serviceAddressNum)
for i := 0; i < tc.serviceAddressNum; i++ {
gtw.Status.Addresses[i] = gwapiv1.GatewayStatusAddress{
Type: ptr.To(gwapiv1.IPAddressType),
Value: strconv.Itoa(i),
}
}

deployment := &appsv1.Deployment{Status: tc.deploymentStatus}
got := computeGatewayProgrammedCondition(gtw, deployment)
updateGatewayProgrammedCondition(gtw, deployment)

assert.Equal(t, string(gwapiv1.GatewayConditionProgrammed), got.Type)
assert.Equal(t, tc.expect.Status, got.Status)
assert.Equal(t, tc.expect.Reason, got.Reason)
if d := cmp.Diff(tc.expectCondition, gtw.Status.Conditions, cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")); d != "" {
t.Errorf("unexpected condition diff: %s", d)
}
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/wasm/httpfetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestWasmHTTPInsecureServer(t *testing.T) {
defer ts.Close()
fetcher := NewHTTPFetcher(DefaultHTTPRequestTimeout, DefaultHTTPRequestMaxRetries, logging.DefaultLogger(egv1a1.LogLevelInfo))
fetcher.initialBackoff = time.Microsecond
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
b, err := fetcher.Fetch(ctx, ts.URL, c.insecure)
if c.wantNumRequest != gotNumRequest {
Expand Down
Loading

0 comments on commit 31595c8

Please sign in to comment.