Skip to content

Commit

Permalink
feat: support sse mode arl#117
Browse files Browse the repository at this point in the history
  • Loading branch information
mzzsfy committed Jan 5, 2024
1 parent 41b6a5e commit fd576db
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 44 deletions.
2 changes: 1 addition & 1 deletion _example/default/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func main() {
go example.Work()

// Register a Statsviz server on the default mux.
statsviz.Register(http.DefaultServeMux)
statsviz.Register(http.DefaultServeMux, statsviz.TransferMode("sse"))

fmt.Println("Point your browser to http://localhost:8080/debug/statsviz/")
log.Fatal(http.ListenAndServe(":8080", nil))
Expand Down
79 changes: 53 additions & 26 deletions internal/static/js/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,38 +26,45 @@ let paused = false;
let show_gc = true;
let timerange = 60;

/* WebSocket connection handling */
const connect = () => {
const uri = buildWebsocketURI();
let ws = new WebSocket(uri);
console.info(`Attempting websocket connection to server at ${uri}`);

ws.onopen = () => {
const dataProcessor = {
initDone: false,
close: () => {
},
connected: false,
retrying: false,
onopen: () => {
dataProcessor.initDone = false;
dataProcessor.connected = true;
console.info("Successfully connected");
timeout = 250; // reset connection timeout for next time
};

ws.onclose = event => {
console.error(`Closed websocket connection: code ${event.code}`);
setTimeout(connect, clamp(timeout += timeout, 250, 5000));
};

ws.onerror = err => {
console.error(`Websocket error, closing connection.`);
ws.close();
};

let initDone = false;
ws.onmessage = event => {
},
onclose: event => {
dataProcessor.connected = false;
console.error(`Closed connection: code ${event.code || event}`);
if (dataProcessor.retrying) {
return
}
dataProcessor.retrying = true
setTimeout(() => {
connect()
dataProcessor.retrying=false
}, clamp(timeout += timeout, 250, 5000));
},
onerror: err => {
console.error(`error, closing connection.`, err);
dataProcessor.close();
},
onmessage: event => {
let data = JSON.parse(event.data)

if (!initDone) {
if (!dataProcessor.initDone) {
configurePlots(PlotsDef);
stats.init(PlotsDef, dataRetentionSeconds);

attachPlots();

$('#play_pause').change(() => { paused = !paused; });
$('#play_pause').change(() => {
paused = !paused;
});
$('#show_gc').change(() => {
show_gc = !show_gc;
updatePlots();
Expand All @@ -68,19 +75,39 @@ const connect = () => {
updatePlots();
});
setInterval(() => {
if (paused || ws.readyState !== WebSocket.OPEN) {
if (paused || !dataProcessor.connected) {
return
}
updatePlots(PlotsDef.events);
}, 1000)
initDone = true;
dataProcessor.initDone = true;
stats.pushData(data);
updatePlots(PlotsDef.events);
return
}
stats.pushData(data);
}
}
/* WebSocket connection handling */
const connect = () => {
if (PlotsDef.transferMode === "sse") {
const url = window.location.pathname + "ws";
const eventSource = new EventSource(url);
console.info(`Attempting sse connection to server at ${url}`);
for (let event in dataProcessor) {
eventSource[event] = dataProcessor[event];
}
dataProcessor.close = eventSource.close
} else {
const uri = buildWebsocketURI();
let ws = new WebSocket(uri);
console.info(`Attempting websocket connection to server at ${uri}`);
for (let event in dataProcessor) {
ws[event] = dataProcessor[event];
}
dataProcessor.close = ws.close
}
}

connect();

Expand Down
99 changes: 83 additions & 16 deletions statsviz.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ func Register(mux *http.ServeMux, opts ...Option) error {
//
// The zero value is not a valid Server, use NewServer to create a valid one.
type Server struct {
intv time.Duration // interval between consecutive metrics emission
root string // HTTP path root
plots *plot.List // plots shown on the user interface
userPlots []plot.UserPlot
intv time.Duration // interval between consecutive metrics emission
root string // HTTP path root
plots *plot.List // plots shown on the user interface
userPlots []plot.UserPlot
transferMode string // transfer mode, default is "ws", can be "sse"
}

// NewServer constructs a new Statsviz Server with the provided options, or the
Expand Down Expand Up @@ -143,6 +144,14 @@ func Root(path string) Option {
}
}

// TransferMode changes the transfer mode,support "ws" and "sse".
func TransferMode(mode string) Option {
return func(s *Server) error {
s.transferMode = mode
return nil
}
}

// TimeseriesPlot adds a new time series plot to Statsviz. This options can
// be added multiple times.
func TimeseriesPlot(tsp TimeSeriesPlot) Option {
Expand All @@ -161,20 +170,34 @@ func (s *Server) Register(mux *http.ServeMux) {
// intercept is a middleware that intercepts requests for plotsdef.js, which is
// generated dynamically based on the plots configuration. Other requests are
// forwarded as-is.
func intercept(h http.Handler, cfg *plot.Config) http.HandlerFunc {
buf := bytes.Buffer{}
buf.WriteString("export default ")
enc := json.NewEncoder(&buf)
enc.SetIndent("", " ")
if err := enc.Encode(cfg); err != nil {
panic("unexpected failure to encode plot definitions: " + err.Error())
func intercept(h http.Handler, cfg *plot.Config, extraConfig map[string]any) http.HandlerFunc {
var plotsdefjs []byte
//Using parentheses helps gc
{
buf := bytes.Buffer{}
buf.WriteString("export default ")
enc := json.NewEncoder(&buf)
enc.SetIndent("", " ")
var encodeValue any = cfg
if len(extraConfig) > 0 {
encodeValue1 := map[string]any{
"series": cfg.Series,
"events": cfg.Events,
}
for k, v := range extraConfig {
encodeValue1[k] = v
}
encodeValue = encodeValue1
}
if err := enc.Encode(encodeValue); err != nil {
panic("unexpected failure to encode plot definitions: " + err.Error())
}
buf.WriteString(";")
plotsdefjs = buf.Bytes()
}
buf.WriteString(";")
plotsdefjs := buf.Bytes()

return func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "js/plotsdef.js" {
w.Header().Add("Content-Length", strconv.Itoa(buf.Len()))
w.Header().Add("Content-Length", strconv.Itoa(len(plotsdefjs)))
w.Header().Add("Content-Type", "text/javascript; charset=utf-8")
w.Write(plotsdefjs)
return
Expand Down Expand Up @@ -228,7 +251,9 @@ func assetsFS() http.FileSystem {
func (s *Server) Index() http.HandlerFunc {
prefix := strings.TrimSuffix(s.root, "/") + "/"
assets := http.FileServer(assetsFS())
handler := intercept(assets, s.plots.Config())
handler := intercept(assets, s.plots.Config(), map[string]any{
"transferMode": s.transferMode,
})

return http.StripPrefix(prefix, handler).ServeHTTP
}
Expand All @@ -238,6 +263,48 @@ func (s *Server) Index() http.HandlerFunc {
// connection to the WebSocket protocol.
func (s *Server) Ws() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Accept") == "text/event-stream" {
tick := time.NewTicker(s.intv)
defer tick.Stop()

// If the WebSocket connection is initiated by an already open web UI
// (started by a previous process, for example), then plotsdef.js won't be
// requested. Call plots.Config() manually to ensure that s.plots internals
// are correctly initialized.
s.plots.Config()

w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
f := func() bool {
// 尝试将事件写入HTTP响应
buffer := bytes.Buffer{}
buffer.WriteString("data: ")
if err := s.plots.WriteValues(&buffer); err != nil {
//fmt.Println("Error plots.WriteValues:", err)
return true
}
buffer.WriteString("\n\n")
_, err := w.Write(buffer.Bytes())
if err != nil {
// 发生错误,可能是连接断开
//fmt.Println("Error writing to response:", err)
return true
}
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
return false
}
f()
// 在HTTP响应中循环写入事件
for range tick.C {
if f() {
return
}
}
panic("unreachable")
}
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
Expand Down
2 changes: 1 addition & 1 deletion statsviz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func Test_intercept(t *testing.T) {
req := httptest.NewRequest(http.MethodGet, "http://example.com/debug/statsviz/js/plotsdef.js", nil)

srv := newServer(t)
intercept(srv.Index(), srv.plots.Config())(w, req)
intercept(srv.Index(), srv.plots.Config(), nil)(w, req)

resp := w.Result()
if resp.StatusCode != http.StatusOK {
Expand Down

0 comments on commit fd576db

Please sign in to comment.