1package knotserver
2
3import (
4 "context"
5 "net/http"
6 "time"
7
8 "github.com/gorilla/websocket"
9)
10
11var upgrader = websocket.Upgrader{
12 ReadBufferSize: 1024,
13 WriteBufferSize: 1024,
14}
15
16func (h *Handle) OpLog(w http.ResponseWriter, r *http.Request) {
17 l := h.l.With("handler", "OpLog")
18 l.Info("received new connection")
19
20 conn, err := upgrader.Upgrade(w, r, nil)
21 if err != nil {
22 l.Error("websocket upgrade failed", "err", err)
23 w.WriteHeader(http.StatusInternalServerError)
24 return
25 }
26 defer conn.Close()
27 l.Info("upgraded http to wss")
28
29 ch := h.n.Subscribe()
30 defer h.n.Unsubscribe(ch)
31
32 ctx, cancel := context.WithCancel(r.Context())
33 defer cancel()
34 go func() {
35 for {
36 if _, _, err := conn.NextReader(); err != nil {
37 l.Error("failed to read", "err", err)
38 cancel()
39 return
40 }
41 }
42 }()
43
44 cursor := ""
45
46 // complete backfill first before going to live data
47 l.Info("going through backfill", "cursor", cursor)
48 if err := h.streamOps(conn, &cursor); err != nil {
49 l.Error("failed to backfill", "err", err)
50 return
51 }
52
53 for {
54 // wait for new data or timeout
55 select {
56 case <-ctx.Done():
57 l.Info("stopping stream: client closed connection")
58 return
59 case <-ch:
60 // we have been notified of new data
61 l.Info("going through live data", "cursor", cursor)
62 if err := h.streamOps(conn, &cursor); err != nil {
63 l.Error("failed to stream", "err", err)
64 return
65 }
66 case <-time.After(30 * time.Second):
67 // send a keep-alive
68 l.Info("sent keepalive")
69 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
70 l.Error("failed to write control", "err", err)
71 }
72 }
73 }
74}
75
76func (h *Handle) streamOps(conn *websocket.Conn, cursor *string) error {
77 ops, err := h.db.GetOps(*cursor)
78 if err != nil {
79 h.l.Debug("err", "err", err)
80 return err
81 }
82 h.l.Debug("ops", "ops", ops)
83
84 for _, op := range ops {
85 if err := conn.WriteJSON(op); err != nil {
86 h.l.Debug("err", "err", err)
87 return err
88 }
89 *cursor = op.Tid
90 }
91
92 return nil
93}