diff --git a/_example/default/go.mod b/_example/default/go.mod index 453aece7..7232af15 100644 --- a/_example/default/go.mod +++ b/_example/default/go.mod @@ -4,6 +4,4 @@ go 1.19 require github.com/arl/statsviz v0.6.0 -require github.com/gorilla/websocket v1.5.0 // indirect - replace github.com/arl/statsviz => ../../ diff --git a/_example/default/go.sum b/_example/default/go.sum index 94a9e7b9..b580f820 100644 --- a/_example/default/go.sum +++ b/_example/default/go.sum @@ -1,5 +1,5 @@ -github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= -github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= diff --git a/go.mod b/go.mod index b708ce7f..48bce6ea 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,12 @@ module github.com/arl/statsviz go 1.20 require ( - github.com/gorilla/websocket v1.5.0 + github.com/gorilla/websocket v1.5.1 github.com/rogpeppe/go-internal v1.11.0 ) require ( - golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect + golang.org/x/net v0.17.0 // indirect + golang.org/x/sys v0.13.0 // indirect golang.org/x/tools v0.1.12 // indirect ) diff --git a/go.sum b/go.sum index a50c6502..cf245aff 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,10 @@ -github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= -github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= +github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= diff --git a/internal/static/js/app.js b/internal/static/js/app.js index 72ba5b2c..23f353a1 100644 --- a/internal/static/js/app.js +++ b/internal/static/js/app.js @@ -3,15 +3,6 @@ import * as plot from "./plot.js"; import * as theme from "./theme.js"; import PlotsDef from './plotsdef.js'; -const buildWebsocketURI = () => { - var loc = window.location, - ws_prot = "ws:"; - if (loc.protocol === "https:") { - ws_prot = "wss:"; - } - return ws_prot + "//" + loc.host + loc.pathname + "ws" -} - const dataRetentionSeconds = 600; var timeout = 250; @@ -26,38 +17,45 @@ let paused = false; let show_gc = true; let timerange = 60; -/* WebSocket connection handling */ -const connect = () => { - const uri = buildWebsocketURI(); - let ws = new WebSocket(uri); - console.info(`Attempting websocket connection to server at ${uri}`); - - ws.onopen = () => { +const dataProcessor = { + initDone: false, + close: () => { + }, + connected: false, + retrying: false, + onopen: () => { + dataProcessor.initDone = false; + dataProcessor.connected = true; console.info("Successfully connected"); timeout = 250; // reset connection timeout for next time - }; - - ws.onclose = event => { - console.error(`Closed websocket connection: code ${event.code}`); - setTimeout(connect, clamp(timeout += timeout, 250, 5000)); - }; - - ws.onerror = err => { - console.error(`Websocket error, closing connection.`); - ws.close(); - }; - - let initDone = false; - ws.onmessage = event => { + }, + onclose: event => { + dataProcessor.connected = false; + console.error(`Closed connection: code ${event.code || event}`); + if (dataProcessor.retrying) { + return + } + dataProcessor.retrying = true + setTimeout(() => { + connect() + dataProcessor.retrying = false + }, clamp(timeout += timeout, 250, 5000)); + }, + onerror: err => { + console.error(`error, closing connection.`, err); + dataProcessor.close(); + }, + onmessage: event => { let data = JSON.parse(event.data) - - if (!initDone) { + if (!dataProcessor.initDone) { configurePlots(PlotsDef); stats.init(PlotsDef, dataRetentionSeconds); attachPlots(); - $('#play_pause').change(() => { paused = !paused; }); + $('#play_pause').change(() => { + paused = !paused; + }); $('#show_gc').change(() => { show_gc = !show_gc; updatePlots(); @@ -67,16 +65,27 @@ const connect = () => { timerange = val; updatePlots(); }); - initDone = true; - return; + dataProcessor.initDone = true; } - + dataProcessor.onData(data); + }, + onData: data => { stats.pushData(data); - if (paused) { + if (paused || !dataProcessor.connected) { return } - updatePlots(PlotsDef.events); + updatePlots() + } +} +/* WebSocket connection handling */ +const connect = () => { + const url = window.location.pathname + "ws"; + const eventSource = new EventSource(url); + console.info(`Attempting sse connection to server at ${url}`); + for (let event in dataProcessor) { + eventSource[event] = dataProcessor[event]; } + dataProcessor.close = eventSource.close } connect(); @@ -102,7 +111,31 @@ const attachPlots = () => { } } -const updatePlots = () => { +function throttle(func, delay) { + let initial = true; + let last = null; + let timer = null; + return function () { + const context = this; + const args = arguments; + if (initial) { + func.apply(context, args); + initial = false; + last = Date.now(); + } else { + clearTimeout(timer); + timer = setTimeout(function () { + const now = Date.now(); + if (now - last >= delay) { + func.apply(context, args); + last = now; + } + }, delay - (Date.now() - last)); + } + } +} + +const updatePlots = throttle(() => { // Create shapes. let shapes = new Map(); @@ -123,7 +156,7 @@ const updatePlots = () => { plot.update(xrange, data, shapes); } }); -} +}, PlotsDef.sendFrequency||1000) const updatePlotsLayout = () => { plots.forEach(plot => { @@ -139,7 +172,7 @@ theme.updateThemeMode(); $('#color_theme_sw').change(() => { const themeMode = theme.getThemeMode(); const newTheme = themeMode === "dark" && "light" || "dark"; - localStorage.setItem("theme-mode", newTheme); + localStorage.setItem("theme-mode", newTheme); theme.updateThemeMode(); updatePlotsLayout(); }); diff --git a/internal/static/js/stats.js b/internal/static/js/stats.js index 5e8c8809..ba938b84 100644 --- a/internal/static/js/stats.js +++ b/internal/static/js/stats.js @@ -92,4 +92,4 @@ const slice = (n) => { return sliced; } -export { init, pushData, slice }; \ No newline at end of file +export { init, pushData, slice }; diff --git a/statsviz.go b/statsviz.go index 1d1cff0d..0a6dec46 100644 --- a/statsviz.go +++ b/statsviz.go @@ -43,15 +43,15 @@ import ( "bytes" "encoding/json" "fmt" + "io" "net/http" "os" "path/filepath" "strconv" "strings" + "sync" "time" - "github.com/gorilla/websocket" - "github.com/arl/statsviz/internal/plot" "github.com/arl/statsviz/internal/static" ) @@ -82,7 +82,7 @@ func Register(mux *http.ServeMux, opts ...Option) error { // updates metrics data and provides two essential HTTP handlers: // - the Index handler serves Statsviz user interface, allowing you to // visualize runtime metrics on your browser. -// - The Ws handler establishes a WebSocket connection allowing the connected +// - The Ws handler establishes a data connection allowing the connected // browser to receive metrics updates from the server. // // The zero value is not a valid Server, use NewServer to create a valid one. @@ -91,6 +91,9 @@ type Server struct { root string // HTTP path root plots *plot.List // plots shown on the user interface userPlots []plot.UserPlot + + lock sync.Mutex + onData []func([]byte) error // all onData callbacks } // NewServer constructs a new Statsviz Server with the provided options, or the @@ -161,20 +164,34 @@ func (s *Server) Register(mux *http.ServeMux) { // intercept is a middleware that intercepts requests for plotsdef.js, which is // generated dynamically based on the plots configuration. Other requests are // forwarded as-is. -func intercept(h http.Handler, cfg *plot.Config) http.HandlerFunc { - buf := bytes.Buffer{} - buf.WriteString("export default ") - enc := json.NewEncoder(&buf) - enc.SetIndent("", " ") - if err := enc.Encode(cfg); err != nil { - panic("unexpected failure to encode plot definitions: " + err.Error()) +func intercept(h http.Handler, cfg *plot.Config, extraConfig map[string]any) http.HandlerFunc { + var plotsdefjs []byte + //Using parentheses helps gc + { + buf := bytes.Buffer{} + buf.WriteString("export default ") + enc := json.NewEncoder(&buf) + enc.SetIndent("", " ") + var encodeValue any = cfg + if len(extraConfig) > 0 { + encodeValue1 := map[string]any{ + "series": cfg.Series, + "events": cfg.Events, + } + for k, v := range extraConfig { + encodeValue1[k] = v + } + encodeValue = encodeValue1 + } + if err := enc.Encode(encodeValue); err != nil { + panic("unexpected failure to encode plot definitions: " + err.Error()) + } + buf.WriteString(";") + plotsdefjs = buf.Bytes() } - buf.WriteString(";") - plotsdefjs := buf.Bytes() - return func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "js/plotsdef.js" { - w.Header().Add("Content-Length", strconv.Itoa(buf.Len())) + w.Header().Add("Content-Length", strconv.Itoa(len(plotsdefjs))) w.Header().Add("Content-Type", "text/javascript; charset=utf-8") w.Write(plotsdefjs) return @@ -228,7 +245,9 @@ func assetsFS() http.FileSystem { func (s *Server) Index() http.HandlerFunc { prefix := strings.TrimSuffix(s.root, "/") + "/" assets := http.FileServer(assetsFS()) - handler := intercept(assets, s.plots.Config()) + handler := intercept(assets, s.plots.Config(), map[string]any{ + "sendFrequency": s.intv.Milliseconds(), + }) return http.StripPrefix(prefix, handler).ServeHTTP } @@ -238,46 +257,80 @@ func (s *Server) Index() http.HandlerFunc { // connection to the WebSocket protocol. func (s *Server) Ws() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, + if strings.Contains(r.Header.Get("Accept"), "/event-stream") { + // If the connection is initiated by an already open web UI + // (started by a previous process, for example), then plotsdef.js won't be + // requested. Call plots.Config() manually to ensure that s.plots internals + // are correctly initialized. + s.plots.Config() + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + s.startTransfer(w) + return } + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("This endpoint only supports text/event-stream requests")) + } +} - ws, err := upgrader.Upgrade(w, r, nil) +func (s *Server) startTransfer(w io.Writer) { + s.lock.Lock() + c := make(chan struct{}) + s.onData = append(s.onData, func(data []byte) error { + _, err := w.Write(data) if err != nil { - return + close(c) + return err } - defer ws.Close() - - // Ignore this error. This happens when the other end connection closes, - // for example. We can't handle it in any meaningful way anyways. - _ = s.sendStats(ws, s.intv) + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + return nil + }) + if len(s.onData) == 1 { + go s.callData() } + s.lock.Unlock() + <-c } - -// sendStats sends runtime statistics over the WebSocket connection. -func (s *Server) sendStats(conn *websocket.Conn, frequency time.Duration) error { - tick := time.NewTicker(frequency) +func (s *Server) callData() { + tick := time.NewTicker(s.intv) defer tick.Stop() - - // If the WebSocket connection is initiated by an already open web UI - // (started by a previous process, for example), then plotsdef.js won't be - // requested. Call plots.Config() manually to ensure that s.plots internals - // are correctly initialized. - s.plots.Config() - for range tick.C { - w, err := conn.NextWriter(websocket.TextMessage) - if err != nil { - return err + buffer := bytes.Buffer{} + buffer.WriteString("data: ") + if err := s.plots.WriteValues(&buffer); err != nil { + //fmt.Println("Error plots.WriteValues:", err) + continue } - if err := s.plots.WriteValues(w); err != nil { - return err + buffer.WriteString("\n\n") + onData := s.onData + del := false + for i, f := range onData { + if err := f(buffer.Bytes()); err != nil { + del = true + onData[i] = nil + continue + } } - if err := w.Close(); err != nil { - return err + if del { + s.lock.Lock() + for i, f := range onData { + if f == nil { + onData = append(onData[:i], onData[i+1:]...) + i-- + } + } + s.onData = onData + s.lock.Unlock() } + s.lock.Lock() + if len(s.onData) == 0 { + s.lock.Unlock() + return + } + s.lock.Unlock() } - - panic("unreachable") } diff --git a/statsviz_test.go b/statsviz_test.go index 1e972f45..f4b73f04 100644 --- a/statsviz_test.go +++ b/statsviz_test.go @@ -1,7 +1,9 @@ package statsviz import ( + "bufio" "bytes" + "encoding/json" "io" "io/fs" "net/http" @@ -11,8 +13,6 @@ import ( "testing" "time" - "github.com/gorilla/websocket" - "github.com/arl/statsviz/internal/static" ) @@ -86,15 +86,26 @@ func testWs(t *testing.T, f http.Handler, URL string) { t.Fatal(err) } - u1.Scheme = "ws" u1.Path = u2.Path // Connect to the server - ws, _, err := websocket.DefaultDialer.Dial(u1.String(), nil) if err != nil { t.Fatalf("%v", err) } - defer ws.Close() + request, err := http.NewRequest(http.MethodGet, u1.String(), nil) + request.Header.Set("Accept", "text/event-stream") + resp, err := http.DefaultClient.Do(request) + if err != nil { + t.Fatalf("requset error %v", err) + return + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("http status %v, want %v", resp.StatusCode, http.StatusOK) + return + } + //sse data , readline to parse + reader := bufio.NewReader(resp.Body) // Check the content of 2 consecutive payloads. for i := 0; i < 2; i++ { @@ -105,10 +116,22 @@ func testWs(t *testing.T, f http.Handler, URL string) { Goroutines []uint64 `json:"goroutines"` SizeClasses []uint64 `json:"size-classes"` } - if err := ws.ReadJSON(&data); err != nil { + line, prefix, err := reader.ReadLine() + if err != nil { + t.Fatalf("failed reading line from websocket: %v", err) + return + } + if prefix { + t.Fatalf("line too long") + return + } + if !bytes.HasPrefix(line, []byte("data: ")) { + i-- + continue + } + if err := json.Unmarshal(line[5:], &data); err != nil { t.Fatalf("failed reading json from websocket: %v", err) } - // The time series must have one and only one element if len(data.Goroutines) != 1 { t.Errorf("len(goroutines) = %d, want 1", len(data.Goroutines)) @@ -134,7 +157,7 @@ func TestWsCantUpgrade(t *testing.T) { newServer(t).Ws()(w, req) if w.Result().StatusCode != http.StatusBadRequest { - t.Errorf("responded %v to %q with non-websocket-upgradable conn, want %v", w.Result().StatusCode, url, http.StatusBadRequest) + t.Errorf("responded %v to %q with non-upgradable conn, want %v", w.Result().StatusCode, url, http.StatusBadRequest) } } @@ -212,7 +235,7 @@ func Test_intercept(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "http://example.com/debug/statsviz/js/plotsdef.js", nil) srv := newServer(t) - intercept(srv.Index(), srv.plots.Config())(w, req) + intercept(srv.Index(), srv.plots.Config(), nil)(w, req) resp := w.Result() if resp.StatusCode != http.StatusOK {