Skip to content

Commit

Permalink
optimize write peer
Browse files Browse the repository at this point in the history
  • Loading branch information
ICKelin committed Aug 15, 2021
1 parent 336d758 commit e6de1ab
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 38 deletions.
53 changes: 19 additions & 34 deletions edge/cframe.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
)

type Server struct {
registry *Registry

// secret
key string

Expand All @@ -44,9 +42,6 @@ type sendReq struct {

type peerConn struct {
addr string
// conn *net.UDPConn
// conn *kcp.UDPSession
// conn net.Conn
conn *smux.Stream
cidr string
}
Expand All @@ -60,25 +55,21 @@ func NewServer(laddr, key string, iface *Interface) *Server {
sndq: make([]chan *sendReq, 1000),
}

go s.readLocal()
for i := 0; i < 1000; i++ {
s.sndq[i] = make(chan *sendReq, 10000)
go s.writePeer(s.sndq[i])
}
return s
}

func (s *Server) SetRegistry(r *Registry) {
s.registry = r
}

func (s *Server) SetVPCInstance(vpcInstance vpc.IVPC) {
if s.vpcInstance == nil {
s.vpcInstance = vpcInstance
}
}

func (s *Server) ListenAndServe() error {
go s.readLocal()
lis, err := net.Listen("tcp", s.laddr)
if err != nil {
return err
Expand Down Expand Up @@ -185,18 +176,8 @@ func (s *Server) readLocal() {
select {
case s.sndq[idx] <- &sendReq{buf, peer}:
default:
peer.SetWriteDeadline(time.Now().Add(time.Second * 3))
nw, err := peer.Write(buf)
peer.SetWriteDeadline(time.Time{})
if err != nil {
log.Error("write to peer %s fail %v", dst, err)
continue
}

if nw != len(buf) {
log.Error("stream write not full")
continue
}
log.Warn("sndq[%d] is full", idx)
s.write(buf, peer)
}
}
}
Expand All @@ -205,18 +186,22 @@ func (s *Server) writePeer(sndq chan *sendReq) {
for req := range sndq {
peer := req.conn
buf := req.buf
peer.SetWriteDeadline(time.Now().Add(time.Second * 3))
nw, err := peer.Write(buf)
peer.SetWriteDeadline(time.Time{})
if err != nil {
log.Error("write to peer fail %v", err)
continue
}
s.write(buf, peer)
}
}

if nw != len(buf) {
log.Error("stream write not full")
continue
}
func (s *Server) write(buf []byte, peer net.Conn) {
peer.SetWriteDeadline(time.Now().Add(time.Second * 3))
nw, err := peer.Write(buf)
peer.SetWriteDeadline(time.Time{})
if err != nil {
log.Error("write to peer fail %v", err)
return
}

if nw != len(buf) {
log.Error("stream write not full")
return
}
}

Expand Down Expand Up @@ -337,7 +322,7 @@ func (s *Server) addRoute(peer *codec.Edge) error {
}

func (s *Server) deadlineCheck(peer *codec.Edge, sess *smux.Session) {
tick := time.NewTicker(time.Second * 5)
tick := time.NewTicker(time.Second * 1)
for range tick.C {
if !sess.IsClosed() {
continue
Expand Down
6 changes: 2 additions & 4 deletions edge/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
func main() {
logLevel := os.Getenv("LOG_LEVEL")
if len(logLevel) == 0 {
logLevel = "debug"
logLevel = "info"
}
log.Init("log/edge.log", logLevel, 3)
log.Init("edge.log", logLevel, 3)

iface, err := NewInterface()
if err != nil {
Expand Down Expand Up @@ -61,7 +61,6 @@ func main() {
if len(ns) <= 0 {
log.Info("use default namespace")
ns = "default"
return
}

s := NewServer(lisAddr, secret, iface)
Expand All @@ -75,6 +74,5 @@ func main() {
}
}()

s.SetRegistry(reg)
s.ListenAndServe()
}

0 comments on commit e6de1ab

Please sign in to comment.