Skip to content

Commit

Permalink
feat: support sse transfer mode #117
Browse files Browse the repository at this point in the history
  • Loading branch information
mzzsfy committed Jan 20, 2024
1 parent 5cd7e6a commit bdbbc1f
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 110 deletions.
2 changes: 0 additions & 2 deletions _example/default/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@ go 1.19

require github.com/arl/statsviz v0.6.0

require github.com/gorilla/websocket v1.5.0 // indirect

replace github.com/arl/statsviz => ../../
6 changes: 3 additions & 3 deletions _example/default/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ module github.com/arl/statsviz
go 1.20

require (
github.com/gorilla/websocket v1.5.0
github.com/rogpeppe/go-internal v1.11.0
)

require (
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/tools v0.1.12 // indirect
)
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
115 changes: 74 additions & 41 deletions internal/static/js/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,6 @@ import * as plot from "./plot.js";
import * as theme from "./theme.js";
import PlotsDef from './plotsdef.js';

const buildWebsocketURI = () => {
var loc = window.location,
ws_prot = "ws:";
if (loc.protocol === "https:") {
ws_prot = "wss:";
}
return ws_prot + "//" + loc.host + loc.pathname + "ws"
}

const dataRetentionSeconds = 600;
var timeout = 250;

Expand All @@ -26,38 +17,45 @@ let paused = false;
let show_gc = true;
let timerange = 60;

/* WebSocket connection handling */
const connect = () => {
const uri = buildWebsocketURI();
let ws = new WebSocket(uri);
console.info(`Attempting websocket connection to server at ${uri}`);

ws.onopen = () => {
const dataProcessor = {
initDone: false,
close: () => {
},
connected: false,
retrying: false,
onopen: () => {
dataProcessor.initDone = false;
dataProcessor.connected = true;
console.info("Successfully connected");
timeout = 250; // reset connection timeout for next time
};

ws.onclose = event => {
console.error(`Closed websocket connection: code ${event.code}`);
setTimeout(connect, clamp(timeout += timeout, 250, 5000));
};

ws.onerror = err => {
console.error(`Websocket error, closing connection.`);
ws.close();
};

let initDone = false;
ws.onmessage = event => {
},
onclose: event => {
dataProcessor.connected = false;
console.error(`Closed connection: code ${event.code || event}`);
if (dataProcessor.retrying) {
return
}
dataProcessor.retrying = true
setTimeout(() => {
connect()
dataProcessor.retrying = false
}, clamp(timeout += timeout, 250, 5000));
},
onerror: err => {
console.error(`error, closing connection.`, err);
dataProcessor.close();
},
onmessage: event => {
let data = JSON.parse(event.data)

if (!initDone) {
if (!dataProcessor.initDone) {
configurePlots(PlotsDef);
stats.init(PlotsDef, dataRetentionSeconds);

attachPlots();

$('#play_pause').change(() => { paused = !paused; });
$('#play_pause').change(() => {
paused = !paused;
});
$('#show_gc').change(() => {
show_gc = !show_gc;
updatePlots();
Expand All @@ -67,16 +65,27 @@ const connect = () => {
timerange = val;
updatePlots();
});
initDone = true;
return;
dataProcessor.initDone = true;
}

dataProcessor.onData(data);
},
onData: data => {
stats.pushData(data);
if (paused) {
if (paused || !dataProcessor.connected) {
return
}
updatePlots(PlotsDef.events);
updatePlots()
}
}
/* WebSocket connection handling */
const connect = () => {
const url = window.location.pathname + "ws";
const eventSource = new EventSource(url);
console.info(`Attempting sse connection to server at ${url}`);
for (let event in dataProcessor) {
eventSource[event] = dataProcessor[event];
}
dataProcessor.close = eventSource.close
}

connect();
Expand All @@ -102,7 +111,31 @@ const attachPlots = () => {
}
}

const updatePlots = () => {
function throttle(func, delay) {
let initial = true;
let last = null;
let timer = null;
return function () {
const context = this;
const args = arguments;
if (initial) {
func.apply(context, args);
initial = false;
last = Date.now();
} else {
clearTimeout(timer);
timer = setTimeout(function () {
const now = Date.now();
if (now - last >= delay) {
func.apply(context, args);
last = now;
}
}, delay - (Date.now() - last));
}
}
}

const updatePlots = throttle(() => {
// Create shapes.
let shapes = new Map();

Expand All @@ -123,7 +156,7 @@ const updatePlots = () => {
plot.update(xrange, data, shapes);
}
});
}
}, PlotsDef.sendFrequency||1000)

const updatePlotsLayout = () => {
plots.forEach(plot => {
Expand All @@ -139,7 +172,7 @@ theme.updateThemeMode();
$('#color_theme_sw').change(() => {
const themeMode = theme.getThemeMode();
const newTheme = themeMode === "dark" && "light" || "dark";
localStorage.setItem("theme-mode", newTheme);
localStorage.setItem("theme-mode", newTheme);
theme.updateThemeMode();
updatePlotsLayout();
});
2 changes: 1 addition & 1 deletion internal/static/js/stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,4 @@ const slice = (n) => {
return sliced;
}

export { init, pushData, slice };
export { init, pushData, slice };
116 changes: 68 additions & 48 deletions statsviz.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,14 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/gorilla/websocket"

"github.com/arl/statsviz/internal/plot"
"github.com/arl/statsviz/internal/static"
)
Expand Down Expand Up @@ -82,7 +81,7 @@ func Register(mux *http.ServeMux, opts ...Option) error {
// updates metrics data and provides two essential HTTP handlers:
// - the Index handler serves Statsviz user interface, allowing you to
// visualize runtime metrics on your browser.
// - The Ws handler establishes a WebSocket connection allowing the connected
// - The Ws handler establishes a data connection allowing the connected
// browser to receive metrics updates from the server.
//
// The zero value is not a valid Server, use NewServer to create a valid one.
Expand Down Expand Up @@ -161,20 +160,34 @@ func (s *Server) Register(mux *http.ServeMux) {
// intercept is a middleware that intercepts requests for plotsdef.js, which is
// generated dynamically based on the plots configuration. Other requests are
// forwarded as-is.
func intercept(h http.Handler, cfg *plot.Config) http.HandlerFunc {
buf := bytes.Buffer{}
buf.WriteString("export default ")
enc := json.NewEncoder(&buf)
enc.SetIndent("", " ")
if err := enc.Encode(cfg); err != nil {
panic("unexpected failure to encode plot definitions: " + err.Error())
func intercept(h http.Handler, cfg *plot.Config, extraConfig map[string]any) http.HandlerFunc {
var plotsdefjs []byte
//Using parentheses helps gc
{
buf := bytes.Buffer{}
buf.WriteString("export default ")
enc := json.NewEncoder(&buf)
enc.SetIndent("", " ")
var encodeValue any = cfg
if len(extraConfig) > 0 {
encodeValue1 := map[string]any{
"series": cfg.Series,
"events": cfg.Events,
}
for k, v := range extraConfig {
encodeValue1[k] = v
}
encodeValue = encodeValue1
}
if err := enc.Encode(encodeValue); err != nil {
panic("unexpected failure to encode plot definitions: " + err.Error())
}
buf.WriteString(";")
plotsdefjs = buf.Bytes()
}
buf.WriteString(";")
plotsdefjs := buf.Bytes()

return func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "js/plotsdef.js" {
w.Header().Add("Content-Length", strconv.Itoa(buf.Len()))
w.Header().Add("Content-Length", strconv.Itoa(len(plotsdefjs)))
w.Header().Add("Content-Type", "text/javascript; charset=utf-8")
w.Write(plotsdefjs)
return
Expand Down Expand Up @@ -228,7 +241,9 @@ func assetsFS() http.FileSystem {
func (s *Server) Index() http.HandlerFunc {
prefix := strings.TrimSuffix(s.root, "/") + "/"
assets := http.FileServer(assetsFS())
handler := intercept(assets, s.plots.Config())
handler := intercept(assets, s.plots.Config(), map[string]any{
"sendFrequency": s.intv.Milliseconds(),
})

return http.StripPrefix(prefix, handler).ServeHTTP
}
Expand All @@ -238,46 +253,51 @@ func (s *Server) Index() http.HandlerFunc {
// connection to the WebSocket protocol.
func (s *Server) Ws() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}

ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
if strings.Contains(r.Header.Get("Accept"), "/event-stream") {
// If the connection is initiated by an already open web UI
// (started by a previous process, for example), then plotsdef.js won't be
// requested. Call plots.Config() manually to ensure that s.plots internals
// are correctly initialized.
s.plots.Config()

w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
s.startTransfer(w)
return
}
defer ws.Close()

// Ignore this error. This happens when the other end connection closes,
// for example. We can't handle it in any meaningful way anyways.
_ = s.sendStats(ws, s.intv)
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("This endpoint only supports text/event-stream requests"))
}
}

// sendStats sends runtime statistics over the WebSocket connection.
func (s *Server) sendStats(conn *websocket.Conn, frequency time.Duration) error {
tick := time.NewTicker(frequency)
defer tick.Stop()

// If the WebSocket connection is initiated by an already open web UI
// (started by a previous process, for example), then plotsdef.js won't be
// requested. Call plots.Config() manually to ensure that s.plots internals
// are correctly initialized.
s.plots.Config()

for range tick.C {
w, err := conn.NextWriter(websocket.TextMessage)
if err != nil {
func (s *Server) startTransfer(w io.Writer) {
buffer := bytes.Buffer{}
buffer.WriteString("data: ")
callData := func() error {
if err := s.plots.WriteValues(&buffer); err == nil {
_, err = w.Write(buffer.Bytes())
if err != nil {
return err
}
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
} else {
return err
}
if err := s.plots.WriteValues(w); err != nil {
return err
}
if err := w.Close(); err != nil {
return err
return nil
}
//the first time it was sent immediately
err := callData()
if err != nil {
return
}
tick := time.NewTicker(s.intv)
defer tick.Stop()
for range tick.C {
if callData() != nil {
return
}
}

panic("unreachable")
}
Loading

0 comments on commit bdbbc1f

Please sign in to comment.