Skip to content

Commit

Permalink
Merge pull request #509 from TarsCloud/perf/lbbniu/transport
Browse files Browse the repository at this point in the history
perf(transport): udp server handle supports coroutine pools
  • Loading branch information
lbbniu committed Jan 13, 2024
2 parents f016edd + 0277e02 commit cbc345d
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 20 deletions.
2 changes: 1 addition & 1 deletion tars/transport/tcphandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (t *tcpHandler) getConnContext(connSt *connInfo) context.Context {
func (t *tcpHandler) handleConn(connSt *connInfo, pkg []byte) {
// recvPkgTs are more accurate
ctx := t.getConnContext(connSt)
atomic.AddInt32(&connSt.numInvoke, 1)
handler := func() {
defer atomic.AddInt32(&connSt.numInvoke, -1)
rsp := t.server.invoke(ctx, pkg)
Expand Down Expand Up @@ -262,7 +263,6 @@ func (t *tcpHandler) recv(connSt *connInfo) {
break
}
if status == PackageFull {
atomic.AddInt32(&connSt.numInvoke, 1)
pkg := make([]byte, pkgLen)
copy(pkg, currBuffer[:pkgLen])
currBuffer = currBuffer[pkgLen:]
Expand Down
57 changes: 38 additions & 19 deletions tars/transport/udphandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync/atomic"
"time"

"github.com/TarsCloud/TarsGo/tars/util/gpool"

"github.com/TarsCloud/TarsGo/tars/protocol/res/basef"
"github.com/TarsCloud/TarsGo/tars/util/current"
"github.com/TarsCloud/TarsGo/tars/util/grace"
Expand All @@ -17,6 +19,7 @@ type udpHandler struct {
server *TarsServer

conn *net.UDPConn
pool *gpool.Pool
}

func (u *udpHandler) Listen() (err error) {
Expand All @@ -26,6 +29,11 @@ func (u *udpHandler) Listen() (err error) {
return err
}
TLOG.Info("UDP listen", u.conn.LocalAddr())

// init goroutine pool
if cfg.MaxInvoke > 0 {
u.pool = gpool.NewPool(int(cfg.MaxInvoke), cfg.QueueCap)
}
return nil
}

Expand All @@ -38,6 +46,35 @@ func (u *udpHandler) getConnContext(udpAddr *net.UDPAddr) context.Context {
return ctx
}

func (u *udpHandler) handleUDPAddr(udpAddr *net.UDPAddr, pkg []byte) {
ctx := u.getConnContext(udpAddr)
atomic.AddInt32(&u.server.numInvoke, 1)
handler := func() {
defer atomic.AddInt32(&u.server.numInvoke, -1)
rsp := u.server.invoke(ctx, pkg) // no need to check package

cPacketType, ok := current.GetPacketTypeFromContext(ctx)
if !ok {
TLOG.Error("Failed to GetPacketTypeFromContext")
}

if cPacketType == basef.TARSONEWAY {
return
}

if _, err := u.conn.WriteToUDP(rsp, udpAddr); err != nil {
TLOG.Errorf("send pkg to %v failed %v", udpAddr, err)
}
}

cfg := u.config
if cfg.MaxInvoke > 0 { // use goroutine pool
u.pool.JobQueue <- handler
} else {
go handler()
}
}

func (u *udpHandler) Handle() error {
atomic.AddInt32(&u.server.numConn, 1)
// wait invoke done
Expand Down Expand Up @@ -67,25 +104,7 @@ func (u *udpHandler) Handle() error {
}
pkg := make([]byte, n)
copy(pkg, buffer[0:n])
ctx := u.getConnContext(udpAddr)
go func() {
atomic.AddInt32(&u.server.numInvoke, 1)
defer atomic.AddInt32(&u.server.numInvoke, -1)
rsp := u.server.invoke(ctx, pkg) // no need to check package

cPacketType, ok := current.GetPacketTypeFromContext(ctx)
if !ok {
TLOG.Error("Failed to GetPacketTypeFromContext")
}

if cPacketType == basef.TARSONEWAY {
return
}

if _, err := u.conn.WriteToUDP(rsp, udpAddr); err != nil {
TLOG.Errorf("send pkg to %v failed %v", udpAddr, err)
}
}()
u.handleUDPAddr(udpAddr, pkg)
}
}

Expand Down

0 comments on commit cbc345d

Please sign in to comment.