Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: replace klog with slog #37

Merged
merged 7 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ run:
modules-download-mode: readonly
linters:
enable:
- exhaustive
- exportloopref
- dupl
- exportloopref
- gochecknoinits
- goconst
- gocritic
Expand All @@ -22,10 +21,6 @@ linters:
- usestdlibvars
- whitespace
linters-settings:
exhaustive:
# presence of "default" case in switch statements satisfies exhaustiveness,
# even if all enum members are not listed
default-signifies-exhaustive: true
issues:
exclude-rules:
- path: _test.go
Expand Down
59 changes: 27 additions & 32 deletions cmd/client/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package main

import (
"fmt"
"log/slog"
"net"
"strconv"

"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/klog/v2"

"github.com/knight42/krelay/pkg/constants"
"github.com/knight42/krelay/pkg/ports"
"github.com/knight42/krelay/pkg/remoteaddr"
slogutil "github.com/knight42/krelay/pkg/slog"
"github.com/knight42/krelay/pkg/xnet"
)

Expand Down Expand Up @@ -53,14 +54,16 @@ func (p *portForwarder) listen(localIP string) error {
func (p *portForwarder) run(streamConn httpstream.Connection) {
switch {
case p.tcpListener != nil:
l := p.tcpListener
defer l.Close()

localAddr := l.Addr().String()
klog.InfoS("Forwarding",
constants.LogFieldProtocol, p.ports.Protocol,
constants.LogFieldLocalAddr, localAddr,
constants.LogFieldRemotePort, p.ports.RemotePort,
lis := p.tcpListener
defer lis.Close()

localAddr := lis.Addr().String()
l := slog.With(
slog.String(constants.LogFieldProtocol, p.ports.Protocol),
slog.String(constants.LogFieldLocalAddr, localAddr),
)
l.Info("Forwarding",
slogutil.Uint16(constants.LogFieldRemotePort, p.ports.RemotePort),
)

for {
Expand All @@ -70,21 +73,15 @@ func (p *portForwarder) run(streamConn httpstream.Connection) {
default:
}

c, err := l.Accept()
c, err := lis.Accept()
if err != nil {
klog.ErrorS(err, "Fail to accept tcp connection",
constants.LogFieldProtocol, p.ports.Protocol,
constants.LogFieldLocalAddr, localAddr,
)
l.Error("Fail to accept tcp connection", slogutil.Error(err))
return
}

remoteAddr, err := p.addrGetter.Get()
if err != nil {
klog.ErrorS(err, "Fail to get remote address",
constants.LogFieldProtocol, p.ports.Protocol,
constants.LogFieldLocalAddr, localAddr,
)
l.Error("Fail to get remote address", slogutil.Error(err))
continue
}
go handleTCPConn(c, streamConn, remoteAddr, p.ports.RemotePort)
Expand All @@ -96,21 +93,21 @@ func (p *portForwarder) run(streamConn httpstream.Connection) {

udpConn := &xnet.UDPConn{UDPConn: pc.(*net.UDPConn)}
localAddr := pc.LocalAddr().String()
klog.InfoS("Forwarding",
constants.LogFieldProtocol, p.ports.Protocol,
constants.LogFieldLocalAddr, localAddr,
constants.LogFieldRemotePort, p.ports.RemotePort,
l := slog.With(
slog.String(constants.LogFieldProtocol, p.ports.Protocol),
slog.String(constants.LogFieldLocalAddr, localAddr),
)
l.Info("Forwarding",
slogutil.Uint16(constants.LogFieldRemotePort, p.ports.RemotePort),
)
track := newConnTrack()
finish := make(chan string)

go func() {
for key := range finish {
track.Delete(key)
klog.V(4).InfoS("Remove udp conn from conntrack table",
"key", key,
constants.LogFieldProtocol, p.ports.Protocol,
constants.LogFieldLocalAddr, localAddr,
l.Debug("Remove udp conn from conntrack table",
slog.String("key", key),
)
}
}()
Expand All @@ -128,9 +125,8 @@ func (p *portForwarder) run(streamConn httpstream.Connection) {

n, cliAddr, err := udpConn.ReadFrom(buf)
if err != nil {
klog.ErrorS(err, "Fail to read udp packet",
constants.LogFieldProtocol, p.ports.Protocol,
constants.LogFieldLocalAddr, localAddr,
l.Error("Fail to read udp packet",
slogutil.Error(err),
)
return
}
Expand All @@ -146,9 +142,8 @@ func (p *portForwarder) run(streamConn httpstream.Connection) {
track.Set(key, dataCh)
remoteAddr, err := p.addrGetter.Get()
if err != nil {
klog.ErrorS(err, "Fail to get remote address",
constants.LogFieldProtocol, p.ports.Protocol,
constants.LogFieldLocalAddr, localAddr,
l.Error("Fail to get remote address",
slogutil.Error(err),
)
continue
}
Expand Down
50 changes: 20 additions & 30 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
Expand All @@ -22,11 +22,11 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport/spdy"
"k8s.io/klog/v2"

"github.com/knight42/krelay/pkg/constants"
"github.com/knight42/krelay/pkg/ports"
"github.com/knight42/krelay/pkg/remoteaddr"
slogutil "github.com/knight42/krelay/pkg/slog"
"github.com/knight42/krelay/pkg/xnet"
)

Expand All @@ -41,6 +41,8 @@ type Options struct {
address string
// targetsFile is the file containing the list of targets.
targetsFile string

verbosity int
}

// setKubernetesDefaults sets default values on the provided client config for accessing the Kubernetes API.
Expand Down Expand Up @@ -153,7 +155,7 @@ func (o *Options) Run(ctx context.Context, args []string) error {
}
}

klog.InfoS("Creating krelay-server", "namespace", o.serverNamespace)
slog.Info("Creating krelay-server", slog.String("namespace", o.serverNamespace))
svrPodName, err := createServerPod(ctx, cs, o.serverImage, o.serverNamespace)
if err != nil {
return fmt.Errorf("create krelay-server pod: %w", err)
Expand All @@ -164,7 +166,7 @@ func (o *Options) Run(ctx context.Context, args []string) error {
if err != nil {
return fmt.Errorf("ensure krelay-server is running: %w", err)
}
klog.InfoS("krelay-server is running", "pod", svrPodName, "namespace", o.serverNamespace)
slog.Info("krelay-server is running", slog.String("pod", svrPodName), slog.String("namespace", o.serverNamespace))

transport, upgrader, err := spdy.RoundTripperFor(restCfg)
if err != nil {
Expand All @@ -191,7 +193,7 @@ func (o *Options) Run(ctx context.Context, args []string) error {
for _, pf := range portForwarders {
err := pf.listen(o.address)
if err != nil {
klog.ErrorS(err, "Fail to listen on port", "port", pf.ports.LocalPort)
slog.Error("Fail to listen on port", slog.Any("port", pf.ports.LocalPort), slog.Any("error", err))
} else {
succeeded = true
}
Expand All @@ -204,15 +206,14 @@ func (o *Options) Run(ctx context.Context, args []string) error {

select {
case <-streamConn.CloseChan():
klog.InfoS("Lost connection to krelay-server pod")
slog.Info("Lost connection to krelay-server pod")
case <-ctx.Done():
}

return nil
}

func main() {
klog.InitFlags(nil)
cf := genericclioptions.NewConfigFlags(true)
o := Options{
getter: cf,
Expand All @@ -237,40 +238,29 @@ service, ip and hostname rather than only pods.`,
})
}

h := slog.NewTextHandler(cmd.ErrOrStderr(), &slog.HandlerOptions{
Level: slogutil.MapVerbosityToLogLevel(o.verbosity),
})
slog.SetDefault(slog.New(h))
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
return o.Run(ctx, args)
},
SilenceUsage: true,
}
flags := c.Flags()
flags.AddGoFlagSet(flag.CommandLine)
cf.AddFlags(flags)
flags.SortFlags = false
flags.StringVar(cf.KubeConfig, "kubeconfig", *cf.KubeConfig, "Path to the kubeconfig file to use for CLI requests.")
flags.StringVarP(cf.Namespace, "namespace", "n", *cf.Namespace, "If present, the namespace scope for this CLI request")
flags.StringVar(cf.Context, "context", *cf.Context, "The name of the kubeconfig context to use")
flags.StringVar(cf.ClusterName, "cluster", *cf.ClusterName, "The name of the kubeconfig cluster to use")

flags.BoolVarP(&printVersion, "version", "V", false, "Print version info and exit.")
flags.StringVar(&o.address, "address", "127.0.0.1", "Address to listen on. Only accepts IP addresses as a value.")
flags.StringVarP(&o.targetsFile, "file", "f", "", "Forward to the targets specified in the given file, with one target per line.")
flags.StringVar(&o.serverImage, "server.image", "ghcr.io/knight42/krelay-server:v0.0.2", "The krelay-server image to use.")
flags.StringVar(&o.serverNamespace, "server.namespace", metav1.NamespaceDefault, "The namespace in which krelay-server is located.")
flags.StringVarP(&o.targetsFile, "file", "f", "", "Forward to the targets specified in the given file, with one target per line.")

// I do not want these flags to show up in --help.
hiddenFlags := []string{
"add_dir_header",
"log_flush_frequency",
"alsologtostderr",
"log_backtrace_at",
"log_dir",
"log_file",
"log_file_max_size",
"one_output",
"logtostderr",
"skip_headers",
"skip_log_headers",
"stderrthreshold",
"vmodule",
}
for _, flagName := range hiddenFlags {
_ = flags.MarkHidden(flagName)
}
flags.IntVarP(&o.verbosity, "v", "v", 3, "Number for the log level verbosity. The bigger the more verbose.")

_ = c.Execute()
}
30 changes: 15 additions & 15 deletions cmd/client/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package main

import (
"io"
"log/slog"
"net"

"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/klog/v2"

"github.com/knight42/krelay/pkg/constants"
slogutil "github.com/knight42/krelay/pkg/slog"
"github.com/knight42/krelay/pkg/xio"
"github.com/knight42/krelay/pkg/xnet"
)
Expand All @@ -16,18 +17,17 @@ func handleTCPConn(clientConn net.Conn, serverConn httpstream.Connection, dstAdd
defer clientConn.Close()

requestID := xnet.NewRequestID()
kvs := []any{constants.LogFieldRequestID, requestID}
defer klog.V(4).InfoS("handleTCPConn exit", kvs...)
klog.InfoS("Handling tcp connection",
constants.LogFieldRequestID, requestID,
constants.LogFieldDestAddr, xnet.JoinHostPort(dstAddr.String(), dstPort),
constants.LogFieldLocalAddr, clientConn.LocalAddr().String(),
"clientAddr", clientConn.RemoteAddr().String(),
l := slog.With(slog.String(constants.LogFieldRequestID, requestID))
defer l.Debug("handleTCPConn exit")
l.Info("Handling tcp connection",
slog.String(constants.LogFieldDestAddr, xnet.JoinHostPort(dstAddr.String(), dstPort)),
slog.String(constants.LogFieldLocalAddr, clientConn.LocalAddr().String()),
slog.String("clientAddr", clientConn.RemoteAddr().String()),
)

dataStream, errorChan, err := createStream(serverConn, requestID)
if err != nil {
klog.ErrorS(err, "Fail to create stream", kvs...)
l.Error("Fail to create stream", slogutil.Error(err))
return
}

Expand All @@ -39,18 +39,18 @@ func handleTCPConn(clientConn net.Conn, serverConn httpstream.Connection, dstAdd
}
_, err = xio.WriteFull(dataStream, hdr.Marshal())
if err != nil {
klog.ErrorS(err, "Fail to write header", kvs...)
l.Error("Fail to write header", slogutil.Error(err))
return
}

var ack xnet.Acknowledgement
err = ack.FromReader(dataStream)
if err != nil {
klog.ErrorS(err, "Fail to receive ack", kvs...)
l.Error("Fail to receive ack", slogutil.Error(err))
return
}
if ack.Code != xnet.AckCodeOK {
klog.ErrorS(ack.Code, "Fail to connect", kvs...)
l.Error("Fail to connect", slogutil.Error(ack.Code))
return
}

Expand All @@ -60,7 +60,7 @@ func handleTCPConn(clientConn net.Conn, serverConn httpstream.Connection, dstAdd
go func() {
// Copy from the remote side to the local port.
if _, err := io.Copy(clientConn, dataStream); err != nil && !xnet.IsClosedConnectionError(err) {
klog.ErrorS(err, "Fail to copy from remote stream to local connection", kvs...)
l.Error("Fail to copy from remote stream to local connection", slogutil.Error(err))
}

// inform the select below that the remote copy is done
Expand All @@ -73,7 +73,7 @@ func handleTCPConn(clientConn net.Conn, serverConn httpstream.Connection, dstAdd

// Copy from the local port to the remote side.
if _, err := io.Copy(dataStream, clientConn); err != nil && !xnet.IsClosedConnectionError(err) {
klog.ErrorS(err, "Fail to copy from local connection to remote stream", kvs...)
l.Error("Fail to copy from local connection to remote stream", slogutil.Error(err))
// break out of the select below without waiting for the other copy to finish
close(localError)
}
Expand All @@ -88,6 +88,6 @@ func handleTCPConn(clientConn net.Conn, serverConn httpstream.Connection, dstAdd
// always expect something on errorChan (it may be nil)
err = <-errorChan
if err != nil {
klog.ErrorS(err, "Unexpected error from stream", kvs...)
l.Error("Unexpected error from stream", slogutil.Error(err))
}
}
Loading