forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
at master 2.8 kB view raw
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "net/http" 7 "strconv" 8 "time" 9 10 "github.com/gorilla/websocket" 11 "tangled.org/core/log" 12) 13 14var upgrader = websocket.Upgrader{ 15 ReadBufferSize: 1024, 16 WriteBufferSize: 1024, 17} 18 19func (h *Knot) Events(w http.ResponseWriter, r *http.Request) { 20 l := log.SubLogger(h.l, "eventstream") 21 l.Debug("received new connection") 22 23 conn, err := upgrader.Upgrade(w, r, nil) 24 if err != nil { 25 l.Error("websocket upgrade failed", "err", err) 26 w.WriteHeader(http.StatusInternalServerError) 27 return 28 } 29 defer conn.Close() 30 l.Debug("upgraded http to wss") 31 32 ch := h.n.Subscribe() 33 defer h.n.Unsubscribe(ch) 34 35 ctx, cancel := context.WithCancel(r.Context()) 36 defer cancel() 37 go func() { 38 for { 39 if _, _, err := conn.NextReader(); err != nil { 40 l.Error("failed to read", "err", err) 41 cancel() 42 return 43 } 44 } 45 }() 46 47 defaultCursor := time.Now().UnixNano() 48 cursorStr := r.URL.Query().Get("cursor") 49 cursor, err := strconv.ParseInt(cursorStr, 10, 64) 50 if err != nil { 51 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor) 52 } 53 if cursor == 0 { 54 cursor = defaultCursor 55 } 56 57 // complete backfill first before going to live data 58 l.Debug("going through backfill", "cursor", cursor) 59 if err := h.streamOps(conn, &cursor); err != nil { 60 l.Error("failed to backfill", "err", err) 61 return 62 } 63 64 for { 65 // wait for new data or timeout 66 select { 67 case <-ctx.Done(): 68 l.Debug("stopping stream: client closed connection") 69 return 70 case <-ch: 71 // we have been notified of new data 72 l.Debug("going through live data", "cursor", cursor) 73 if err := h.streamOps(conn, &cursor); err != nil { 74 l.Error("failed to stream", "err", err) 75 return 76 } 77 case <-time.After(30 * time.Second): 78 // send a keep-alive 79 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 80 l.Error("failed to write control", "err", err) 81 } 82 } 83 } 84} 85 86func (h *Knot) streamOps(conn *websocket.Conn, cursor *int64) error { 87 events, err := h.db.GetEvents(*cursor) 88 if err != nil { 89 h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor) 90 return err 91 } 92 93 for _, event := range events { 94 // first extract the inner json into a map 95 var eventJson map[string]any 96 err := json.Unmarshal([]byte(event.EventJson), &eventJson) 97 if err != nil { 98 h.l.Error("failed to unmarshal event", "err", err) 99 return err 100 } 101 102 jsonMsg, err := json.Marshal(map[string]any{ 103 "rkey": event.Rkey, 104 "nsid": event.Nsid, 105 "event": eventJson, 106 }) 107 if err != nil { 108 h.l.Error("failed to marshal record", "err", err) 109 return err 110 } 111 112 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil { 113 h.l.Debug("err", "err", err) 114 return err 115 } 116 *cursor = event.Created 117 } 118 119 return nil 120}