forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "net/http"
7 "strconv"
8 "time"
9
10 "github.com/gorilla/websocket"
11 "tangled.org/core/log"
12)
13
14var upgrader = websocket.Upgrader{
15 ReadBufferSize: 1024,
16 WriteBufferSize: 1024,
17}
18
19func (h *Knot) Events(w http.ResponseWriter, r *http.Request) {
20 l := log.SubLogger(h.l, "eventstream")
21 l.Debug("received new connection")
22
23 conn, err := upgrader.Upgrade(w, r, nil)
24 if err != nil {
25 l.Error("websocket upgrade failed", "err", err)
26 w.WriteHeader(http.StatusInternalServerError)
27 return
28 }
29 defer conn.Close()
30 l.Debug("upgraded http to wss")
31
32 ch := h.n.Subscribe()
33 defer h.n.Unsubscribe(ch)
34
35 ctx, cancel := context.WithCancel(r.Context())
36 defer cancel()
37 go func() {
38 for {
39 if _, _, err := conn.NextReader(); err != nil {
40 l.Error("failed to read", "err", err)
41 cancel()
42 return
43 }
44 }
45 }()
46
47 defaultCursor := time.Now().UnixNano()
48 cursorStr := r.URL.Query().Get("cursor")
49 cursor, err := strconv.ParseInt(cursorStr, 10, 64)
50 if err != nil {
51 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor)
52 }
53 if cursor == 0 {
54 cursor = defaultCursor
55 }
56
57 // complete backfill first before going to live data
58 l.Debug("going through backfill", "cursor", cursor)
59 if err := h.streamOps(conn, &cursor); err != nil {
60 l.Error("failed to backfill", "err", err)
61 return
62 }
63
64 for {
65 // wait for new data or timeout
66 select {
67 case <-ctx.Done():
68 l.Debug("stopping stream: client closed connection")
69 return
70 case <-ch:
71 // we have been notified of new data
72 l.Debug("going through live data", "cursor", cursor)
73 if err := h.streamOps(conn, &cursor); err != nil {
74 l.Error("failed to stream", "err", err)
75 return
76 }
77 case <-time.After(30 * time.Second):
78 // send a keep-alive
79 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
80 l.Error("failed to write control", "err", err)
81 }
82 }
83 }
84}
85
86func (h *Knot) streamOps(conn *websocket.Conn, cursor *int64) error {
87 events, err := h.db.GetEvents(*cursor)
88 if err != nil {
89 h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor)
90 return err
91 }
92
93 for _, event := range events {
94 // first extract the inner json into a map
95 var eventJson map[string]any
96 err := json.Unmarshal([]byte(event.EventJson), &eventJson)
97 if err != nil {
98 h.l.Error("failed to unmarshal event", "err", err)
99 return err
100 }
101
102 jsonMsg, err := json.Marshal(map[string]any{
103 "rkey": event.Rkey,
104 "nsid": event.Nsid,
105 "event": eventJson,
106 })
107 if err != nil {
108 h.l.Error("failed to marshal record", "err", err)
109 return err
110 }
111
112 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil {
113 h.l.Debug("err", "err", err)
114 return err
115 }
116 *cursor = event.Created
117 }
118
119 return nil
120}