1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "net/http"
7 "strconv"
8 "time"
9
10 "github.com/gorilla/websocket"
11)
12
13var upgrader = websocket.Upgrader{
14 ReadBufferSize: 1024,
15 WriteBufferSize: 1024,
16}
17
18func (h *Knot) Events(w http.ResponseWriter, r *http.Request) {
19 l := h.l.With("handler", "OpLog")
20 l.Debug("received new connection")
21
22 conn, err := upgrader.Upgrade(w, r, nil)
23 if err != nil {
24 l.Error("websocket upgrade failed", "err", err)
25 w.WriteHeader(http.StatusInternalServerError)
26 return
27 }
28 defer conn.Close()
29 l.Debug("upgraded http to wss")
30
31 ch := h.n.Subscribe()
32 defer h.n.Unsubscribe(ch)
33
34 ctx, cancel := context.WithCancel(r.Context())
35 defer cancel()
36 go func() {
37 for {
38 if _, _, err := conn.NextReader(); err != nil {
39 l.Error("failed to read", "err", err)
40 cancel()
41 return
42 }
43 }
44 }()
45
46 defaultCursor := time.Now().UnixNano()
47 cursorStr := r.URL.Query().Get("cursor")
48 cursor, err := strconv.ParseInt(cursorStr, 10, 64)
49 if err != nil {
50 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor)
51 }
52 if cursor == 0 {
53 cursor = defaultCursor
54 }
55
56 // complete backfill first before going to live data
57 l.Debug("going through backfill", "cursor", cursor)
58 if err := h.streamOps(conn, &cursor); err != nil {
59 l.Error("failed to backfill", "err", err)
60 return
61 }
62
63 for {
64 // wait for new data or timeout
65 select {
66 case <-ctx.Done():
67 l.Debug("stopping stream: client closed connection")
68 return
69 case <-ch:
70 // we have been notified of new data
71 l.Debug("going through live data", "cursor", cursor)
72 if err := h.streamOps(conn, &cursor); err != nil {
73 l.Error("failed to stream", "err", err)
74 return
75 }
76 case <-time.After(30 * time.Second):
77 // send a keep-alive
78 l.Debug("sent keepalive")
79 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
80 l.Error("failed to write control", "err", err)
81 }
82 }
83 }
84}
85
86func (h *Knot) streamOps(conn *websocket.Conn, cursor *int64) error {
87 events, err := h.db.GetEvents(*cursor)
88 if err != nil {
89 h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor)
90 return err
91 }
92 h.l.Debug("ops", "ops", events)
93
94 for _, event := range events {
95 // first extract the inner json into a map
96 var eventJson map[string]any
97 err := json.Unmarshal([]byte(event.EventJson), &eventJson)
98 if err != nil {
99 h.l.Error("failed to unmarshal event", "err", err)
100 return err
101 }
102
103 jsonMsg, err := json.Marshal(map[string]any{
104 "rkey": event.Rkey,
105 "nsid": event.Nsid,
106 "event": eventJson,
107 })
108 if err != nil {
109 h.l.Error("failed to marshal record", "err", err)
110 return err
111 }
112
113 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil {
114 h.l.Debug("err", "err", err)
115 return err
116 }
117 *cursor = event.Created
118 }
119
120 return nil
121}