Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2/3 header changes][tchannel] Emit metrics for new headers handling behaviour. Hide actual changes under feature flag. #2272

Open
wants to merge 1 commit into
base: tchannel-refactoring-2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion transport/tchannel/channel_outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,9 @@ func (o *ChannelOutbound) Call(ctx context.Context, req *transport.Request) (*tr
}

err = getResponseError(headers)
deleteReservedHeaders(headers)
// no check: err will be returned as is
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this comment intentional?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, for some reason I was asking myself "where's error handling?" every time I look at this line, so I decided to left a comment


deleteReservedHeaders(headers, o.transport.reservedHeaderMetric.With(req.Caller, req.Service))

resp := &transport.Response{
Headers: headers,
Expand Down
44 changes: 24 additions & 20 deletions transport/tchannel/channel_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/uber/tchannel-go"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/internal/observability"
"go.uber.org/yarpc/pkg/lifecycle"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -82,13 +83,14 @@ func (options transportOptions) newChannelTransport() *ChannelTransport {
logger = zap.NewNop()
}
return &ChannelTransport{
once: lifecycle.NewOnce(),
ch: options.ch,
addr: options.addr,
tracer: options.tracer,
logger: logger.Named("tchannel"),
originalHeaders: options.originalHeaders,
newResponseWriter: newHandlerWriter,
once: lifecycle.NewOnce(),
ch: options.ch,
addr: options.addr,
tracer: options.tracer,
logger: logger.Named("tchannel"),
originalHeaders: options.originalHeaders,
newResponseWriter: newHandlerWriter,
reservedHeaderMetric: observability.NewReserveHeaderMetrics(options.meter, TransportName+"_channel"),
}
}

Expand All @@ -97,14 +99,15 @@ func (options transportOptions) newChannelTransport() *ChannelTransport {
// If you have a YARPC peer.Chooser, use the unqualified tchannel.Transport
// instead.
type ChannelTransport struct {
once *lifecycle.Once
ch Channel
addr string
tracer opentracing.Tracer
logger *zap.Logger
router transport.Router
originalHeaders bool
newResponseWriter func(inboundCallResponse, tchannel.Format, headerCase) responseWriter
once *lifecycle.Once
ch Channel
addr string
tracer opentracing.Tracer
logger *zap.Logger
router transport.Router
originalHeaders bool
newResponseWriter responseWriterConstructor
reservedHeaderMetric *observability.ReservedHeaderMetrics
}

// Channel returns the underlying TChannel "Channel" instance.
Expand Down Expand Up @@ -140,11 +143,12 @@ func (t *ChannelTransport) start() error {
sc := t.ch.GetSubChannel(s)
existing := sc.GetHandlers()
sc.SetHandler(handler{
existing: existing,
router: t.router,
tracer: t.tracer,
logger: t.logger,
newResponseWriter: t.newResponseWriter,
existing: existing,
router: t.router,
tracer: t.tracer,
logger: t.logger,
reservedHeaderMetrics: t.reservedHeaderMetric,
newResponseWriter: t.newResponseWriter,
})
}
}
Expand Down
5 changes: 4 additions & 1 deletion transport/tchannel/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.uber.org/multierr"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/internal/bufferpool"
"go.uber.org/yarpc/internal/observability"
"go.uber.org/yarpc/pkg/errors"
"go.uber.org/yarpc/yarpcerrors"
"go.uber.org/zap"
Expand Down Expand Up @@ -99,6 +100,7 @@ type handler struct {
tracer opentracing.Tracer
headerCase headerCase
logger *zap.Logger
reservedHeaderMetrics *observability.ReservedHeaderMetrics
newResponseWriter responseWriterConstructor
excludeServiceHeaderInResponse bool
}
Expand All @@ -109,7 +111,7 @@ func (h handler) Handle(ctx ncontext.Context, call *tchannel.InboundCall) {

func (h handler) handle(ctx context.Context, call inboundCall) {
// you MUST close the responseWriter no matter what unless you have a tchannel.SystemError
responseWriter := h.newResponseWriter(call.Response(), call.Format(), h.headerCase)
responseWriter := h.newResponseWriter(call.Response(), call.Format(), h.headerCase, h.reservedHeaderMetrics.With(call.CallerName(), call.ServiceName()))
defer responseWriter.ReleaseBuffer()

if !h.excludeServiceHeaderInResponse {
Expand Down Expand Up @@ -183,6 +185,7 @@ func (h handler) callHandler(ctx context.Context, call inboundCall, responseWrit
}

transportHeadersToRequest(treq, headers)
deleteReservedPrefixHeaders(headers, h.reservedHeaderMetrics.With(call.CallerName(), call.ServiceName()))
treq.Headers = headers

if tcall, ok := call.(tchannelCall); ok {
Expand Down
9 changes: 5 additions & 4 deletions transport/tchannel/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"go.uber.org/yarpc/api/transport/transporttest"
"go.uber.org/yarpc/encoding/json"
"go.uber.org/yarpc/encoding/raw"
"go.uber.org/yarpc/internal/observability"
"go.uber.org/yarpc/internal/routertest"
"go.uber.org/yarpc/internal/testtime"
pkgerrors "go.uber.org/yarpc/pkg/errors"
Expand Down Expand Up @@ -580,7 +581,7 @@ func TestResponseWriter(t *testing.T) {
resp := newResponseRecorder()
call.resp = resp

w := newHandlerWriter(call.Response(), call.Format(), tt.headerCase)
w := newHandlerWriter(call.Response(), call.Format(), tt.headerCase, observability.ReservedHeaderEdgeMetrics{})
tt.apply(w)
assert.NoError(t, w.Close())

Expand Down Expand Up @@ -623,7 +624,7 @@ func TestResponseWriterFailure(t *testing.T) {
resp := newResponseRecorder()
tt.setupResp(resp)

w := newHandlerWriter(resp, tchannel.Raw, canonicalizedHeaderCase)
w := newHandlerWriter(resp, tchannel.Raw, canonicalizedHeaderCase, observability.ReservedHeaderEdgeMetrics{})
_, err := w.Write([]byte("foo"))
assert.NoError(t, err)
_, err = w.Write([]byte("bar"))
Expand All @@ -638,7 +639,7 @@ func TestResponseWriterFailure(t *testing.T) {

func TestResponseWriterEmptyBodyHeaders(t *testing.T) {
res := newResponseRecorder()
w := newHandlerWriter(res, tchannel.Raw, canonicalizedHeaderCase)
w := newHandlerWriter(res, tchannel.Raw, canonicalizedHeaderCase, observability.ReservedHeaderEdgeMetrics{})

w.AddHeaders(transport.NewHeaders().With("foo", "bar"))
require.NoError(t, w.Close())
Expand Down Expand Up @@ -809,7 +810,7 @@ func TestRpcServiceHeader(t *testing.T) {
hw := &responseWriterImpl{}
h := handler{
headerCase: canonicalizedHeaderCase,
newResponseWriter: func(inboundCallResponse, tchannel.Format, headerCase) responseWriter {
newResponseWriter: func(inboundCallResponse, tchannel.Format, headerCase, observability.ReservedHeaderEdgeMetrics) responseWriter {
return hw
},
}
Expand Down
55 changes: 54 additions & 1 deletion transport/tchannel/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/uber/tchannel-go"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/internal/observability"
"go.uber.org/yarpc/transport/tchannel/internal"
"go.uber.org/yarpc/yarpcerrors"
)
Expand Down Expand Up @@ -68,11 +69,25 @@ var _reservedHeaderKeys = map[string]struct{}{
CallerProcedureHeader: {},
}

var (
// enforceHeaderRules is a feature flag for a more strict header handling rules.
// If true and isReservedHeaderPrefix also true, an error will be returned for
// attempt to set such header; header will be stripped for incoming requests and receiving responses.
// See https://github.com/yarpc/yarpc-go/issues/2265 for more details.
enforceHeaderRules = false
)

// isReservedHeaderKey checks header name by exact match.
func isReservedHeaderKey(key string) bool {
_, ok := _reservedHeaderKeys[strings.ToLower(key)]
return ok
}

// isReservedHeaderPrefix checks header name by prefix match.
func isReservedHeaderPrefix(header string) bool {
return strings.HasPrefix(strings.ToLower(header), "rpc-") || strings.HasPrefix(strings.ToLower(header), "$rpc$-")
}

// readRequestHeaders reads headers and baggage from an incoming request.
func readRequestHeaders(
ctx context.Context,
Expand Down Expand Up @@ -236,10 +251,48 @@ func getHeaderMap(hs transport.Headers, headerCase headerCase) map[string]string
}
}

func deleteReservedHeaders(headers transport.Headers) {
func findReservedHeaderPrefix(headers map[string]string) (string, bool) {
for key := range headers {
if isReservedHeaderPrefix(key) {
return key, true
}
}
return "", false
}

func validateApplicationHeaders(headers map[string]string, edgeMetrics observability.ReservedHeaderEdgeMetrics) error {
key, found := findReservedHeaderPrefix(headers)
if !found {
return nil
}

edgeMetrics.IncError()

if enforceHeaderRules {
return yarpcerrors.InternalErrorf("header with rpc prefix is not allowed in request application headers (%s was passed)", key)
}
return nil
}

func deleteReservedHeaders(headers transport.Headers, edgeMetrics observability.ReservedHeaderEdgeMetrics) {
for headerKey := range _reservedHeaderKeys {
headers.Del(headerKey)
}

deleteReservedPrefixHeaders(headers, edgeMetrics)
}

func deleteReservedPrefixHeaders(headers transport.Headers, edgeMetrics observability.ReservedHeaderEdgeMetrics) {
for key := range headers.Items() {
if !isReservedHeaderPrefix(key) {
continue
}

edgeMetrics.IncStripped()
if enforceHeaderRules {
headers.Del(key)
}
}
}

// this check ensures that the service we're issuing a request to is the one
Expand Down
Loading