diff --git a/_example/default/main.go b/_example/default/main.go index 670a6d10..84680fe3 100644 --- a/_example/default/main.go +++ b/_example/default/main.go @@ -14,7 +14,7 @@ func main() { go example.Work() // Register a Statsviz server on the default mux. - statsviz.Register(http.DefaultServeMux) + statsviz.Register(http.DefaultServeMux, statsviz.TransferMode("sse")) fmt.Println("Point your browser to http://localhost:8080/debug/statsviz/") log.Fatal(http.ListenAndServe(":8080", nil)) diff --git a/internal/static/js/app.js b/internal/static/js/app.js index b7c34491..e9d11769 100644 --- a/internal/static/js/app.js +++ b/internal/static/js/app.js @@ -26,38 +26,45 @@ let paused = false; let show_gc = true; let timerange = 60; -/* WebSocket connection handling */ -const connect = () => { - const uri = buildWebsocketURI(); - let ws = new WebSocket(uri); - console.info(`Attempting websocket connection to server at ${uri}`); - - ws.onopen = () => { +const dataProcessor = { + initDone: false, + close: () => { + }, + connected: false, + retrying: false, + onopen: () => { + dataProcessor.initDone = false; + dataProcessor.connected = true; console.info("Successfully connected"); timeout = 250; // reset connection timeout for next time - }; - - ws.onclose = event => { - console.error(`Closed websocket connection: code ${event.code}`); - setTimeout(connect, clamp(timeout += timeout, 250, 5000)); - }; - - ws.onerror = err => { - console.error(`Websocket error, closing connection.`); - ws.close(); - }; - - let initDone = false; - ws.onmessage = event => { + }, + onclose: event => { + dataProcessor.connected = false; + console.error(`Closed connection: code ${event.code || event}`); + if (dataProcessor.retrying) { + return + } + dataProcessor.retrying = true + setTimeout(() => { + connect() + dataProcessor.retrying=false + }, clamp(timeout += timeout, 250, 5000)); + }, + onerror: err => { + console.error(`error, closing connection.`, err); + dataProcessor.close(); + }, + onmessage: event => { let data = JSON.parse(event.data) - - if (!initDone) { + if (!dataProcessor.initDone) { configurePlots(PlotsDef); stats.init(PlotsDef, dataRetentionSeconds); attachPlots(); - $('#play_pause').change(() => { paused = !paused; }); + $('#play_pause').change(() => { + paused = !paused; + }); $('#show_gc').change(() => { show_gc = !show_gc; updatePlots(); @@ -68,12 +75,12 @@ const connect = () => { updatePlots(); }); setInterval(() => { - if (paused || ws.readyState !== WebSocket.OPEN) { + if (paused || !dataProcessor.connected) { return } updatePlots(PlotsDef.events); }, 1000) - initDone = true; + dataProcessor.initDone = true; stats.pushData(data); updatePlots(PlotsDef.events); return @@ -81,6 +88,26 @@ const connect = () => { stats.pushData(data); } } +/* WebSocket connection handling */ +const connect = () => { + if (PlotsDef.transferMode === "sse") { + const url = window.location.pathname + "ws"; + const eventSource = new EventSource(url); + console.info(`Attempting sse connection to server at ${url}`); + for (let event in dataProcessor) { + eventSource[event] = dataProcessor[event]; + } + dataProcessor.close = eventSource.close + } else { + const uri = buildWebsocketURI(); + let ws = new WebSocket(uri); + console.info(`Attempting websocket connection to server at ${uri}`); + for (let event in dataProcessor) { + ws[event] = dataProcessor[event]; + } + dataProcessor.close = ws.close + } +} connect(); diff --git a/statsviz.go b/statsviz.go index 1d1cff0d..911242bb 100644 --- a/statsviz.go +++ b/statsviz.go @@ -87,10 +87,11 @@ func Register(mux *http.ServeMux, opts ...Option) error { // // The zero value is not a valid Server, use NewServer to create a valid one. type Server struct { - intv time.Duration // interval between consecutive metrics emission - root string // HTTP path root - plots *plot.List // plots shown on the user interface - userPlots []plot.UserPlot + intv time.Duration // interval between consecutive metrics emission + root string // HTTP path root + plots *plot.List // plots shown on the user interface + userPlots []plot.UserPlot + transferMode string // transfer mode, default is "ws", can be "sse" } // NewServer constructs a new Statsviz Server with the provided options, or the @@ -143,6 +144,14 @@ func Root(path string) Option { } } +// TransferMode changes the transfer mode,support "ws" and "sse". +func TransferMode(mode string) Option { + return func(s *Server) error { + s.transferMode = mode + return nil + } +} + // TimeseriesPlot adds a new time series plot to Statsviz. This options can // be added multiple times. func TimeseriesPlot(tsp TimeSeriesPlot) Option { @@ -161,20 +170,34 @@ func (s *Server) Register(mux *http.ServeMux) { // intercept is a middleware that intercepts requests for plotsdef.js, which is // generated dynamically based on the plots configuration. Other requests are // forwarded as-is. -func intercept(h http.Handler, cfg *plot.Config) http.HandlerFunc { - buf := bytes.Buffer{} - buf.WriteString("export default ") - enc := json.NewEncoder(&buf) - enc.SetIndent("", " ") - if err := enc.Encode(cfg); err != nil { - panic("unexpected failure to encode plot definitions: " + err.Error()) +func intercept(h http.Handler, cfg *plot.Config, extraConfig map[string]any) http.HandlerFunc { + var plotsdefjs []byte + //Using parentheses helps gc + { + buf := bytes.Buffer{} + buf.WriteString("export default ") + enc := json.NewEncoder(&buf) + enc.SetIndent("", " ") + var encodeValue any = cfg + if len(extraConfig) > 0 { + encodeValue1 := map[string]any{ + "series": cfg.Series, + "events": cfg.Events, + } + for k, v := range extraConfig { + encodeValue1[k] = v + } + encodeValue = encodeValue1 + } + if err := enc.Encode(encodeValue); err != nil { + panic("unexpected failure to encode plot definitions: " + err.Error()) + } + buf.WriteString(";") + plotsdefjs = buf.Bytes() } - buf.WriteString(";") - plotsdefjs := buf.Bytes() - return func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "js/plotsdef.js" { - w.Header().Add("Content-Length", strconv.Itoa(buf.Len())) + w.Header().Add("Content-Length", strconv.Itoa(len(plotsdefjs))) w.Header().Add("Content-Type", "text/javascript; charset=utf-8") w.Write(plotsdefjs) return @@ -228,7 +251,9 @@ func assetsFS() http.FileSystem { func (s *Server) Index() http.HandlerFunc { prefix := strings.TrimSuffix(s.root, "/") + "/" assets := http.FileServer(assetsFS()) - handler := intercept(assets, s.plots.Config()) + handler := intercept(assets, s.plots.Config(), map[string]any{ + "transferMode": s.transferMode, + }) return http.StripPrefix(prefix, handler).ServeHTTP } @@ -238,6 +263,48 @@ func (s *Server) Index() http.HandlerFunc { // connection to the WebSocket protocol. func (s *Server) Ws() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("Accept") == "text/event-stream" { + tick := time.NewTicker(s.intv) + defer tick.Stop() + + // If the WebSocket connection is initiated by an already open web UI + // (started by a previous process, for example), then plotsdef.js won't be + // requested. Call plots.Config() manually to ensure that s.plots internals + // are correctly initialized. + s.plots.Config() + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + f := func() bool { + // 尝试将事件写入HTTP响应 + buffer := bytes.Buffer{} + buffer.WriteString("data: ") + if err := s.plots.WriteValues(&buffer); err != nil { + //fmt.Println("Error plots.WriteValues:", err) + return true + } + buffer.WriteString("\n\n") + _, err := w.Write(buffer.Bytes()) + if err != nil { + // 发生错误,可能是连接断开 + //fmt.Println("Error writing to response:", err) + return true + } + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + return false + } + f() + // 在HTTP响应中循环写入事件 + for range tick.C { + if f() { + return + } + } + panic("unreachable") + } var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, diff --git a/statsviz_test.go b/statsviz_test.go index 1e972f45..51d7d781 100644 --- a/statsviz_test.go +++ b/statsviz_test.go @@ -212,7 +212,7 @@ func Test_intercept(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "http://example.com/debug/statsviz/js/plotsdef.js", nil) srv := newServer(t) - intercept(srv.Index(), srv.plots.Config())(w, req) + intercept(srv.Index(), srv.plots.Config(), nil)(w, req) resp := w.Result() if resp.StatusCode != http.StatusOK {