1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "net/http"
7 "strconv"
8 "time"
9
10 "github.com/gorilla/websocket"
11)
12
13var upgrader = websocket.Upgrader{
14 ReadBufferSize: 1024,
15 WriteBufferSize: 1024,
16}
17
18func (h *Handle) Events(w http.ResponseWriter, r *http.Request) {
19 l := h.l.With("handler", "OpLog")
20 l.Debug("received new connection")
21
22 conn, err := upgrader.Upgrade(w, r, nil)
23 if err != nil {
24 l.Error("websocket upgrade failed", "err", err)
25 w.WriteHeader(http.StatusInternalServerError)
26 return
27 }
28 defer conn.Close()
29 l.Debug("upgraded http to wss")
30
31 ch := h.n.Subscribe()
32 defer h.n.Unsubscribe(ch)
33
34 ctx, cancel := context.WithCancel(r.Context())
35 defer cancel()
36 go func() {
37 for {
38 if _, _, err := conn.NextReader(); err != nil {
39 l.Error("failed to read", "err", err)
40 cancel()
41 return
42 }
43 }
44 }()
45
46 cursorStr := r.URL.Query().Get("cursor")
47 cursor, err := strconv.ParseInt(cursorStr, 10, 64)
48 if err != nil {
49 l.Error("empty or invalid cursor, defaulting to zero", "invalidCursor", cursorStr)
50 }
51
52 // complete backfill first before going to live data
53 l.Info("going through backfill", "cursor", cursor)
54 l.Debug("going through backfill", "cursor", cursor)
55 if err := h.streamOps(conn, &cursor); err != nil {
56 l.Error("failed to backfill", "err", err)
57 return
58 }
59 for {
60 // wait for new data or timeout
61 select {
62 case <-ctx.Done():
63 l.Debug("stopping stream: client closed connection")
64 return
65 case <-ch:
66 // we have been notified of new data
67 l.Debug("going through live data", "cursor", cursor)
68 if err := h.streamOps(conn, &cursor); err != nil {
69 l.Error("failed to stream", "err", err)
70 return
71 }
72 case <-time.After(30 * time.Second):
73 // send a keep-alive
74 l.Debug("sent keepalive")
75 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
76 l.Error("failed to write control", "err", err)
77 }
78 }
79 }
80}
81
82func (h *Handle) streamOps(conn *websocket.Conn, cursor *int64) error {
83 events, err := h.db.GetEvents(*cursor)
84 if err != nil {
85 h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor)
86 return err
87 }
88 h.l.Debug("ops", "ops", events)
89
90 for _, event := range events {
91 // first extract the inner json into a map
92 var eventJson map[string]any
93 err := json.Unmarshal([]byte(event.EventJson), &eventJson)
94 if err != nil {
95 h.l.Error("failed to unmarshal event", "err", err)
96 return err
97 }
98
99 jsonMsg, err := json.Marshal(map[string]any{
100 "rkey": event.Rkey,
101 "nsid": event.Nsid,
102 "event": eventJson,
103 })
104 if err != nil {
105 h.l.Error("failed to marshal record", "err", err)
106 return err
107 }
108
109 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil {
110 h.l.Debug("err", "err", err)
111 return err
112 }
113 *cursor = event.Created
114 }
115
116 return nil
117}