Skip to content

Commit

Permalink
perf: k8s to koko
Browse files Browse the repository at this point in the history
  • Loading branch information
feng626 committed Jun 24, 2024
1 parent d77933c commit 56ae91b
Show file tree
Hide file tree
Showing 28 changed files with 7,338 additions and 2,455 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,5 @@ pnpm-debug.log*
demo
assets/
ui/dist/favicon.ico
ui/dist/index.html
ui/dist/index.html
docker-compose.yaml
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ require (
golang.org/x/term v0.21.0
golang.org/x/text v0.16.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.23.1
k8s.io/apimachinery v0.23.1
k8s.io/client-go v0.23.1
Expand Down Expand Up @@ -123,6 +122,7 @@ require (
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.30.0 // indirect
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect
Expand Down
26 changes: 23 additions & 3 deletions pkg/httpd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@ type Client struct {

sync.Mutex

// 用于防抖处理
buffer bytes.Buffer
bufferMutex sync.Mutex
timer *time.Timer

KubernetesId string
Namespace string
Pod string
Container string
}

func (c *Client) WinCh() <-chan ssh.Window {
Expand All @@ -47,13 +53,26 @@ func (c *Client) Read(p []byte) (n int, err error) {

// 向客户端发送数据进行1毫秒的防抖处理
func (c *Client) Write(p []byte) (n int, err error) {
<<<<<<< HEAD

Check failure on line 56 in pkg/httpd/client.go

View workflow job for this annotation

GitHub Actions / lint

syntax error: unexpected <<, expected } (typecheck)

Check failure on line 56 in pkg/httpd/client.go

View workflow job for this annotation

GitHub Actions / lint

expected statement, found '<<' (typecheck)

Check failure on line 56 in pkg/httpd/client.go

View workflow job for this annotation

GitHub Actions / lint

syntax error: unexpected <<, expected }) (typecheck)
c.bufferMutex.Lock()
defer c.bufferMutex.Unlock()

c.buffer.Write(p)

if c.timer == nil {
c.timer = time.AfterFunc(time.Millisecond, c.flushBuffer)
=======

Check failure on line 64 in pkg/httpd/client.go

View workflow job for this annotation

GitHub Actions / lint

expected statement, found '==' (typecheck)
messageType := TerminalBinary
if c.KubernetesId != "" {
messageType = TerminalK8SBinary
}

msg := Message{
Id: c.Conn.Uuid,
Type: messageType,
Raw: p,
KubernetesId: c.KubernetesId,
>>>>>>> 4fd5b00 (perf: k8s to koko)

Check failure on line 75 in pkg/httpd/client.go

View workflow job for this annotation

GitHub Actions / lint

expected operand, found '>>' (typecheck)
}
return len(p), nil

Check failure on line 77 in pkg/httpd/client.go

View workflow job for this annotation

GitHub Actions / lint

missing ',' in composite literal (typecheck)
}
Expand Down Expand Up @@ -156,9 +175,10 @@ func (c *Client) HandleRoomEvent(event string, roomMsg *exchange.RoomMessage) {
return
}
var msg = Message{
Id: c.Conn.Uuid,
Type: msgType,
Data: msgData,
Id: c.Conn.Uuid,
Type: msgType,
Data: msgData,
KubernetesId: c.KubernetesId,
}
c.Conn.SendMessage(&msg)
}
16 changes: 15 additions & 1 deletion pkg/httpd/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,18 @@ type Message struct {
Id string `json:"id"`
Type string `json:"type"`
Data string `json:"data"`
Raw []byte `json:"-"`
Raw []byte `json:"raw"`
Err string `json:"err"`

//Chat AI
Prompt string `json:"prompt"`
Interrupt bool `json:"interrupt"`

//K8S
KubernetesId string `json:"k8s_id"`
Namespace string `json:"namespace"`
Pod string `json:"pod"`
Container string `json:"container"`
}

const (
Expand Down Expand Up @@ -50,6 +56,12 @@ const (
TerminalError = "TERMINAL_ERROR"

MessageNotify = "MESSAGE_NOTIFY"

TerminalK8SInit = "TERMINAL_K8S_INIT"
TerminalK8STree = "TERMINAL_K8S_TREE"
TerminalK8SData = "TERMINAL_K8S_DATA"
TerminalK8SBinary = "TERMINAL_K8S_BINARY"
TerminalK8SResize = "TERMINAL_K8S_RESIZE"
)

type WindowSize struct {
Expand Down Expand Up @@ -97,6 +109,8 @@ const (
TargetTypeMonitor = "monitor"

TargetTypeShare = "share"

TargetTypeK8s = "k8s"
)

const (
Expand Down
126 changes: 99 additions & 27 deletions pkg/httpd/tty.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type tty struct {
backendClient *Client

shareInfo *ShareInfo

K8sClients map[string]*Client
}

func (h *tty) Name() string {
Expand All @@ -35,6 +37,11 @@ func (h *tty) CleanUp() {
if h.backendClient != nil {
_ = h.backendClient.Close()
}

for id, client := range h.K8sClients {
_ = client.Close()
delete(h.K8sClients, id)
}
}

func (h *tty) CheckValidation() error {
Expand Down Expand Up @@ -108,10 +115,47 @@ func (h *tty) HandleMessage(msg *Message) {
pty: ssh.Pty{Term: "xterm", Window: win},
}
h.wg.Add(1)
go h.proxy(&h.wg)
go h.proxy(&h.wg, h.backendClient)
return
case TerminalK8SInit:
if msg.Id != h.ws.Uuid {
logger.Errorf("Ws[%s] terminal initial unknown message id %s", h.ws.Uuid, msg.Id)
return
}

var connectInfo TerminalConnectData
err := json.Unmarshal([]byte(msg.Data), &connectInfo)
if err != nil {
logger.Errorf("Ws[%s] terminal initial message data unmarshal err: %s",
h.ws.Uuid, err)
return
}

win := ssh.Window{
Width: connectInfo.Cols,
Height: connectInfo.Rows,
}
userR, userW := io.Pipe()

KubernetesId := msg.KubernetesId
client := &Client{
WinChan: make(chan ssh.Window, 100), Conn: h.ws,
UserRead: userR, UserWrite: userW,
pty: ssh.Pty{Term: "xterm", Window: win},
KubernetesId: KubernetesId, Namespace: msg.Namespace,
Pod: msg.Pod, Container: msg.Container,
}

if h.K8sClients == nil {
h.K8sClients = make(map[string]*Client)
}
h.K8sClients[KubernetesId] = client
h.wg.Add(1)
go h.proxy(&h.wg, client)
return
}
if h.initialed {

if h.initialed || func() bool { _, ok := h.K8sClients[msg.KubernetesId]; return ok }() {
h.handleTerminalMessage(msg)
}
}
Expand All @@ -135,22 +179,13 @@ func (h *tty) sendSessionMessage(data string) {

func (h *tty) handleTerminalMessage(msg *Message) {
switch msg.Type {
case TerminalData:
h.backendClient.WriteData([]byte(msg.Data))
case TerminalBinary:
h.backendClient.WriteData(msg.Raw)
case TerminalResize:
var size WindowSize
err := json.Unmarshal([]byte(msg.Data), &size)
if err != nil {
logger.Errorf("Ws[%s] message(%s) data unmarshal err: %s", h.ws.Uuid,
msg.Type, msg.Data)
return
}
h.backendClient.SetWinSize(ssh.Window{
Width: size.Cols,
Height: size.Rows,
})
case TerminalData, TerminalBinary:
data := getDataBytes(msg)
h.backendClient.WriteData(data)
case TerminalResize, TerminalK8SResize:
h.handleResize(msg)
case TerminalK8SData, TerminalK8SBinary:
h.handleK8SMessage(msg)
case TerminalShare:
var shareData ShareRequestParams

Expand Down Expand Up @@ -198,12 +233,48 @@ func (h *tty) handleTerminalMessage(msg *Message) {
return
case CLOSE:
_ = h.backendClient.Close()
if k8sClient, ok := h.K8sClients[msg.KubernetesId]; ok {
_ = k8sClient.Close()
delete(h.K8sClients, msg.KubernetesId)
}
default:
logger.Infof("Ws[%s] handle unknown message(%s) data %s", h.ws.Uuid,
msg.Type, msg.Data)
}
}

func getDataBytes(msg *Message) []byte {
if msg.Type == TerminalData || msg.Type == TerminalK8SData {
return []byte(msg.Data)
}
return msg.Raw
}

func (h *tty) handleK8SMessage(msg *Message) {
if k8sClient, ok := h.K8sClients[msg.KubernetesId]; ok {
k8sClient.WriteData(getDataBytes(msg))
}
}

func (h *tty) handleResize(msg *Message) {
var size WindowSize
err := json.Unmarshal([]byte(msg.Data), &size)
if err != nil {
logger.Errorf("Ws[%s] message(%s) data unmarshal err: %s", h.ws.Uuid, msg.Type, msg.Data)
return
}
if msg.Type == TerminalResize {
h.backendClient.SetWinSize(ssh.Window{
Width: size.Cols,
Height: size.Rows,
})
} else if msg.Type == TerminalK8SResize {
if k8sClient, ok := h.K8sClients[msg.KubernetesId]; ok {
k8sClient.SetWinSize(ssh.Window{Width: size.Cols, Height: size.Rows})
}
}
}

func (h *tty) removeShareUser(query *RemoveSharingUserParams) {
if room := exchange.GetRoom(query.SessionId); room != nil {
var data = make(map[string]interface{})
Expand Down Expand Up @@ -322,11 +393,10 @@ func (h *tty) ValidateShareParams(shareId, code string) (info ShareInfo, err err
return ShareInfo{recordRes}, nil
}

func (h *tty) getK8sContainerInfo() *proxy.ContainerInfo {
params := h.ws.wsParams
pod := params.Pod
namespace := params.Namespace
container := params.Container
func (h *tty) getK8sContainerInfo(client *Client) *proxy.ContainerInfo {
pod := client.Pod
namespace := client.Namespace
container := client.Container
if pod == "" || namespace == "" || container == "" {
return nil
}
Expand All @@ -350,7 +420,7 @@ func (h *tty) getConnectionParams() *proxy.ConnectionParams {
return &params
}

func (h *tty) proxy(wg *sync.WaitGroup) {
func (h *tty) proxy(wg *sync.WaitGroup, client *Client) {
defer wg.Done()
params := h.ws.wsParams
switch params.TargetType {
Expand All @@ -365,8 +435,8 @@ func (h *tty) proxy(wg *sync.WaitGroup) {
proxyOpts = append(proxyOpts, proxy.ConnectTokenAuthInfo(connectToken))
proxyOpts = append(proxyOpts, proxy.ConnectI18nLang(h.ws.langCode))
proxyOpts = append(proxyOpts, proxy.ConnectParams(h.getConnectionParams()))
proxyOpts = append(proxyOpts, proxy.ConnectContainer(h.getK8sContainerInfo()))
srv, err := proxy.NewServer(h.backendClient, h.ws.apiClient, proxyOpts...)
proxyOpts = append(proxyOpts, proxy.ConnectContainer(h.getK8sContainerInfo(client)))
srv, err := proxy.NewServer(client, h.ws.apiClient, proxyOpts...)
if err != nil {
logger.Errorf("Create proxy server failed: %s", err)
h.sendCloseMessage()
Expand All @@ -378,7 +448,9 @@ func (h *tty) proxy(wg *sync.WaitGroup) {
}
srv.Proxy()
}
h.sendCloseMessage()
if params.TargetType != TargetTypeK8s {
h.sendCloseMessage()
}
logger.Info("Ws tty proxy end")
}

Expand Down
Loading

0 comments on commit 56ae91b

Please sign in to comment.