forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "net/http"
7 "time"
8
9 "github.com/gorilla/websocket"
10)
11
12var upgrader = websocket.Upgrader{
13 ReadBufferSize: 1024,
14 WriteBufferSize: 1024,
15}
16
17func (h *Handle) Events(w http.ResponseWriter, r *http.Request) {
18 l := h.l.With("handler", "OpLog")
19 l.Info("received new connection")
20
21 conn, err := upgrader.Upgrade(w, r, nil)
22 if err != nil {
23 l.Error("websocket upgrade failed", "err", err)
24 w.WriteHeader(http.StatusInternalServerError)
25 return
26 }
27 defer conn.Close()
28 l.Info("upgraded http to wss")
29
30 ch := h.n.Subscribe()
31 defer h.n.Unsubscribe(ch)
32
33 ctx, cancel := context.WithCancel(r.Context())
34 defer cancel()
35 go func() {
36 for {
37 if _, _, err := conn.NextReader(); err != nil {
38 l.Error("failed to read", "err", err)
39 cancel()
40 return
41 }
42 }
43 }()
44
45 cursor := r.URL.Query().Get("cursor")
46
47 // complete backfill first before going to live data
48 l.Info("going through backfill", "cursor", cursor)
49 if err := h.streamOps(conn, &cursor); err != nil {
50 l.Error("failed to backfill", "err", err)
51 return
52 }
53
54 for {
55 // wait for new data or timeout
56 select {
57 case <-ctx.Done():
58 l.Info("stopping stream: client closed connection")
59 return
60 case <-ch:
61 // we have been notified of new data
62 l.Info("going through live data", "cursor", cursor)
63 if err := h.streamOps(conn, &cursor); err != nil {
64 l.Error("failed to stream", "err", err)
65 return
66 }
67 case <-time.After(30 * time.Second):
68 // send a keep-alive
69 l.Info("sent keepalive")
70 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
71 l.Error("failed to write control", "err", err)
72 }
73 }
74 }
75}
76
77func (h *Handle) streamOps(conn *websocket.Conn, cursor *string) error {
78 events, err := h.db.GetEvents(*cursor)
79 if err != nil {
80 h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor)
81 return err
82 }
83 h.l.Debug("ops", "ops", events)
84
85 for _, event := range events {
86 // first extract the inner json into a map
87 var eventJson map[string]any
88 err := json.Unmarshal([]byte(event.EventJson), &eventJson)
89 if err != nil {
90 h.l.Error("failed to unmarshal event", "err", err)
91 return err
92 }
93
94 jsonMsg, err := json.Marshal(map[string]any{
95 "rkey": event.Rkey,
96 "nsid": event.Nsid,
97 "event": eventJson,
98 })
99 if err != nil {
100 h.l.Error("failed to marshal record", "err", err)
101 return err
102 }
103
104 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil {
105 h.l.Debug("err", "err", err)
106 return err
107 }
108 *cursor = event.Rkey
109 }
110
111 return nil
112}