Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor server and proxy and remove gnet/v2 #344

Merged
merged 35 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
d094e0c
Replace gnet/v2 engine API with one that uses net stdlib
mostafa Oct 7, 2023
4fc692a
Refactor server, proxy and client to adapt to the new API
mostafa Oct 7, 2023
ad69ae8
Remove unused Server options
mostafa Oct 7, 2023
f89cfde
Fix tests
mostafa Oct 7, 2023
5cb26b2
Fix linter errors
mostafa Oct 8, 2023
57bc655
Fix nil error and wrap the error
mostafa Oct 8, 2023
a219b40
Prevent race using a RWMutex
mostafa Oct 9, 2023
74856a0
Fix server test
mostafa Oct 9, 2023
9defae5
Update dependencies
mostafa Oct 13, 2023
3909b20
Use an atomic boolean as a simple state machine for controlling Engin…
mostafa Oct 13, 2023
004608a
Use a mutex to read/update server.Status
mostafa Oct 13, 2023
fcebc7b
Use separate metrics for proxy passthroughs (to-server, to-client) an…
mostafa Oct 13, 2023
38fa88a
Use an atomic bool to check if the client is connected or not before …
mostafa Oct 13, 2023
1548fd2
Pass all the variables into the goroutine to prevent data races
mostafa Oct 13, 2023
3cb8719
Update tests to reflect the changes
mostafa Oct 13, 2023
020e63f
Add Reconnect method to recycle the connection
mostafa Oct 13, 2023
22a5d09
Fix connection close issue
mostafa Oct 13, 2023
fb9fdab
Internalize net.Conn in Client struct to prevent direct access to met…
mostafa Oct 15, 2023
e549e58
Limit run time of integration test
mostafa Oct 15, 2023
f02a31f
Add test for Reconnect
mostafa Oct 15, 2023
8013cb9
Fix metrics
mostafa Oct 15, 2023
3e734cb
Remove unused variable
mostafa Oct 15, 2023
e13389a
Fix connection hanging
mostafa Oct 15, 2023
f201736
Check if the value is nil before putting it into the pool
mostafa Oct 15, 2023
fa790c6
Check if connection exists before trying to close it
mostafa Oct 15, 2023
068f03c
Close channel before returning
mostafa Oct 15, 2023
91998ea
Move Run function to server.Run
mostafa Oct 15, 2023
f1c70bf
Add a NewEngine function to create a new instance of the Engine struct
mostafa Oct 15, 2023
f377c83
Add test for Engine
mostafa Oct 15, 2023
91efb66
Use background context instead of Cmd.Context to prevent races
mostafa Oct 15, 2023
ac85264
Close listener after use
mostafa Oct 15, 2023
909a23c
Don't send on a closed channel
mostafa Oct 15, 2023
b009dc1
Renew timeout of plugin context
mostafa Oct 15, 2023
4399644
Use a stack data structure to push and restore the last request
mostafa Oct 15, 2023
fc41aed
Update dependencies
mostafa Oct 17, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ jobs:
name: Test GatewayD Plugins
runs-on: ubuntu-latest
needs: test
timeout-minutes: 3
services:
postgres:
image: postgres
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dist/

# Editor files
.vscode
.idea

# Logs
*.log
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ clean:
test:
@go test -v ./...

test-race:
@go test -race -v ./...

benchmark:
@go test -bench=. -benchmem -run=^# ./...

Expand Down
78 changes: 35 additions & 43 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
usage "github.com/gatewayd-io/gatewayd/usagereport/v1"
"github.com/getsentry/sentry-go"
"github.com/go-co-op/gocron"
"github.com/panjf2000/gnet/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -71,7 +70,6 @@ var (

func StopGracefully(
runCtx context.Context,
pluginTimeoutCtx context.Context,
sig os.Signal,
metricsMerger *metrics.Merger,
metricsServer *http.Server,
Expand All @@ -88,6 +86,10 @@ func StopGracefully(

logger.Info().Msg("Notifying the plugins that the server is shutting down")
if pluginRegistry != nil {
pluginTimeoutCtx, cancel := context.WithTimeout(context.Background(), conf.Plugin.Timeout)
defer cancel()

//nolint:contextcheck
_, err := pluginRegistry.Run(
pluginTimeoutCtx,
map[string]interface{}{"signal": signal},
Expand All @@ -99,11 +101,12 @@ func StopGracefully(
}
}

logger.Info().Msg("Stopping GatewayD")
span.AddEvent("Stopping GatewayD", trace.WithAttributes(
logger.Info().Msg("GatewayD is shutting down")
span.AddEvent("GatewayD is shutting down", trace.WithAttributes(
attribute.String("signal", signal),
))
if healthCheckScheduler != nil {
healthCheckScheduler.Stop()
healthCheckScheduler.Clear()
logger.Info().Msg("Stopped health check scheduler")
span.AddEvent("Stopped health check scheduler")
Expand Down Expand Up @@ -269,7 +272,7 @@ var runCmd = &cobra.Command{
startDelay := time.Now().Add(conf.Plugin.HealthCheckPeriod)
if _, err := healthCheckScheduler.Every(
conf.Plugin.HealthCheckPeriod).SingletonMode().StartAt(startDelay).Do(func() {
_, span = otel.Tracer(config.TracerName).Start(ctx, "Run plugin health check")
_, span := otel.Tracer(config.TracerName).Start(ctx, "Run plugin health check")
defer span.End()

var plugins []string
Expand Down Expand Up @@ -461,6 +464,9 @@ var runCmd = &cobra.Command{
}(conf.Global.Metrics[config.Default], logger)

// This is a notification hook, so we don't care about the result.
pluginTimeoutCtx, cancel = context.WithTimeout(context.Background(), conf.Plugin.Timeout)
defer cancel()

if data, ok := conf.GlobalKoanf.Get("loggers").(map[string]interface{}); ok {
_, err = pluginRegistry.Run(
pluginTimeoutCtx, data, v1.HookName_HOOK_NAME_ON_NEW_LOGGER)
Expand Down Expand Up @@ -527,6 +533,10 @@ var runCmd = &cobra.Command{

span.AddEvent("Create client", eventOptions)

pluginTimeoutCtx, cancel = context.WithTimeout(
context.Background(), conf.Plugin.Timeout)
defer cancel()

clientCfg := map[string]interface{}{
"id": client.ID,
"network": client.Network,
Expand Down Expand Up @@ -571,6 +581,10 @@ var runCmd = &cobra.Command{
os.Exit(gerr.FailedToInitializePool)
}

pluginTimeoutCtx, cancel = context.WithTimeout(
context.Background(), conf.Plugin.Timeout)
defer cancel()

_, err = pluginRegistry.Run(
pluginTimeoutCtx,
map[string]interface{}{"name": name, "size": cfg.GetSize()},
Expand Down Expand Up @@ -610,6 +624,10 @@ var runCmd = &cobra.Command{
attribute.String("healthCheckPeriod", cfg.HealthCheckPeriod.String()),
))

pluginTimeoutCtx, cancel = context.WithTimeout(
context.Background(), conf.Plugin.Timeout)
defer cancel()

if data, ok := conf.GlobalKoanf.Get("proxies").(map[string]interface{}); ok {
_, err = pluginRegistry.Run(
pluginTimeoutCtx, data, v1.HookName_HOOK_NAME_ON_NEW_PROXY)
Expand All @@ -633,30 +651,9 @@ var runCmd = &cobra.Command{
cfg.Network,
cfg.Address,
cfg.GetTickInterval(),
[]gnet.Option{
// Scheduling options
gnet.WithMulticore(cfg.MultiCore),
gnet.WithLockOSThread(cfg.LockOSThread),
// NumEventLoop overrides Multicore option.
// gnet.WithNumEventLoop(1),

network.Option{
// Can be used to send keepalive messages to the client.
gnet.WithTicker(cfg.EnableTicker),

// Internal event-loop load balancing options
gnet.WithLoadBalancing(cfg.GetLoadBalancer()),

// Buffer options
gnet.WithReadBufferCap(cfg.ReadBufferCap),
gnet.WithWriteBufferCap(cfg.WriteBufferCap),
gnet.WithSocketRecvBuffer(cfg.SocketRecvBuffer),
gnet.WithSocketSendBuffer(cfg.SocketSendBuffer),

// TCP options
gnet.WithReuseAddr(cfg.ReuseAddress),
gnet.WithReusePort(cfg.ReusePort),
gnet.WithTCPKeepAlive(cfg.TCPKeepAlive),
gnet.WithTCPNoDelay(cfg.GetTCPNoDelay()),
EnableTicker: cfg.EnableTicker,
},
proxies[name],
logger,
Expand All @@ -669,21 +666,13 @@ var runCmd = &cobra.Command{
attribute.String("network", cfg.Network),
attribute.String("address", cfg.Address),
attribute.String("tickInterval", cfg.TickInterval.String()),
attribute.Bool("multiCore", cfg.MultiCore),
attribute.Bool("lockOSThread", cfg.LockOSThread),
attribute.Bool("enableTicker", cfg.EnableTicker),
attribute.String("loadBalancer", cfg.LoadBalancer),
attribute.Int("readBufferCap", cfg.ReadBufferCap),
attribute.Int("writeBufferCap", cfg.WriteBufferCap),
attribute.Int("socketRecvBuffer", cfg.SocketRecvBuffer),
attribute.Int("socketSendBuffer", cfg.SocketSendBuffer),
attribute.Bool("reuseAddress", cfg.ReuseAddress),
attribute.Bool("reusePort", cfg.ReusePort),
attribute.String("tcpKeepAlive", cfg.TCPKeepAlive.String()),
attribute.Bool("tcpNoDelay", cfg.TCPNoDelay),
attribute.String("pluginTimeout", conf.Plugin.Timeout.String()),
))

pluginTimeoutCtx, cancel = context.WithTimeout(
context.Background(), conf.Plugin.Timeout)
defer cancel()

if data, ok := conf.GlobalKoanf.Get("servers").(map[string]interface{}); ok {
_, err = pluginRegistry.Run(
pluginTimeoutCtx, data, v1.HookName_HOOK_NAME_ON_NEW_SERVER)
Expand Down Expand Up @@ -786,13 +775,15 @@ var runCmd = &cobra.Command{
go func(pluginRegistry *plugin.Registry,
logger zerolog.Logger,
servers map[string]*network.Server,
metricsMerger *metrics.Merger,
metricsServer *http.Server,
stopChan chan struct{},
) {
for sig := range signalsCh {
for _, s := range signals {
if sig != s {
StopGracefully(
runCtx,
pluginTimeoutCtx,
sig,
metricsMerger,
metricsServer,
Expand All @@ -805,13 +796,14 @@ var runCmd = &cobra.Command{
}
}
}
}(pluginRegistry, logger, servers)
}(pluginRegistry, logger, servers, metricsMerger, metricsServer, stopChan)

_, span = otel.Tracer(config.TracerName).Start(runCtx, "Start servers")
// Start the server.
for name, server := range servers {
logger := loggers[name]
go func(
span trace.Span,
server *network.Server,
logger zerolog.Logger,
healthCheckScheduler *gocron.Scheduler,
Expand All @@ -831,7 +823,7 @@ var runCmd = &cobra.Command{
pluginRegistry.Shutdown()
os.Exit(gerr.FailedToStartServer)
}
}(server, logger, healthCheckScheduler, metricsMerger, pluginRegistry)
}(span, server, logger, healthCheckScheduler, metricsMerger, pluginRegistry)
}
span.End()

Expand Down
12 changes: 5 additions & 7 deletions cmd/run_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"os"
"sync"
"testing"
Expand Down Expand Up @@ -29,8 +30,7 @@ func Test_runCmd(t *testing.T) {
time.Sleep(100 * time.Millisecond)

StopGracefully(
runCmd.Context(),
runCmd.Context(),
context.Background(),
nil,
nil,
nil,
Expand Down Expand Up @@ -88,8 +88,7 @@ func Test_runCmdWithMultiTenancy(t *testing.T) {
time.Sleep(500 * time.Millisecond)

StopGracefully(
runCmd.Context(),
runCmd.Context(),
context.Background(),
nil,
nil,
nil,
Expand Down Expand Up @@ -165,11 +164,10 @@ func Test_runCmdWithCachePlugin(t *testing.T) {
var waitGroup sync.WaitGroup
waitGroup.Add(1)
go func(waitGroup *sync.WaitGroup) {
time.Sleep(500 * time.Millisecond)
time.Sleep(time.Second)

StopGracefully(
runCmd.Context(),
runCmd.Context(),
context.Background(),
nil,
nil,
nil,
Expand Down
19 changes: 4 additions & 15 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,21 +129,10 @@ func (c *Config) LoadDefaults(ctx context.Context) {
}

defaultServer := Server{
Network: DefaultListenNetwork,
Address: DefaultListenAddress,
EnableTicker: false,
TickInterval: DefaultTickInterval,
MultiCore: true,
LockOSThread: false,
ReuseAddress: true,
ReusePort: true,
LoadBalancer: DefaultLoadBalancer,
ReadBufferCap: DefaultBufferSize,
WriteBufferCap: DefaultBufferSize,
SocketRecvBuffer: DefaultBufferSize,
SocketSendBuffer: DefaultBufferSize,
TCPKeepAlive: DefaultTCPKeepAliveDuration,
TCPNoDelay: DefaultTCPNoDelay,
Network: DefaultListenNetwork,
Address: DefaultListenAddress,
EnableTicker: false,
TickInterval: DefaultTickInterval,
}

c.globalDefaults = GlobalConfig{
Expand Down
1 change: 1 addition & 0 deletions config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ const (
DefaultTCPKeepAliveDuration = 3 * time.Second
DefaultLoadBalancer = "roundrobin"
DefaultTCPNoDelay = true
DefaultEngineStopTimeout = 5 * time.Second

// Utility constants.
DefaultSeed = 1000
Expand Down
23 changes: 0 additions & 23 deletions config/getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"path/filepath"
"time"

"github.com/panjf2000/gnet/v2"
"github.com/rs/zerolog"
)

Expand All @@ -28,11 +27,6 @@ var (
"continue": Continue,
"stop": Stop,
}
loadBalancers = map[string]gnet.LoadBalancing{
"roundrobin": gnet.RoundRobin,
"leastconnections": gnet.LeastConnections,
"sourceaddrhash": gnet.SourceAddrHash,
}
logOutputs = map[string]LogOutput{
"console": Console,
"stdout": Stdout,
Expand Down Expand Up @@ -166,23 +160,6 @@ func (s Server) GetTickInterval() time.Duration {
return s.TickInterval
}

// GetLoadBalancer returns the load balancing algorithm to use.
func (s Server) GetLoadBalancer() gnet.LoadBalancing {
if lb, ok := loadBalancers[s.LoadBalancer]; ok {
return lb
}
return gnet.RoundRobin
}

// GetTCPNoDelay returns the TCP no delay option from config file.
func (s Server) GetTCPNoDelay() gnet.TCPSocketOpt {
if s.TCPNoDelay {
return gnet.TCPNoDelay
}

return gnet.TCPDelay
}

// GetSize returns the pool size from config file.
func (p Pool) GetSize() int {
if p.Size == 0 {
Expand Down
13 changes: 0 additions & 13 deletions config/getters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"testing"
"time"

"github.com/panjf2000/gnet/v2"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -75,18 +74,6 @@ func TestGetTickInterval(t *testing.T) {
assert.Equal(t, DefaultTickInterval, server.GetTickInterval())
}

// TestGetLoadBalancer tests the GetLoadBalancer function.
func TestGetLoadBalancer(t *testing.T) {
server := Server{}
assert.Equal(t, gnet.RoundRobin, server.GetLoadBalancer())
}

// TestGetTCPNoDelay tests the GetTCPNoDelay function.
func TestGetTCPNoDelay(t *testing.T) {
server := Server{}
assert.Equal(t, gnet.TCPDelay, server.GetTCPNoDelay())
}

// TestGetSize tests the GetSize function.
func TestGetSize(t *testing.T) {
pool := Pool{}
Expand Down
Loading