forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "net/http" 7 "time" 8 9 "github.com/gorilla/websocket" 10) 11 12var upgrader = websocket.Upgrader{ 13 ReadBufferSize: 1024, 14 WriteBufferSize: 1024, 15} 16 17func (h *Handle) Events(w http.ResponseWriter, r *http.Request) { 18 l := h.l.With("handler", "OpLog") 19 l.Info("received new connection") 20 21 conn, err := upgrader.Upgrade(w, r, nil) 22 if err != nil { 23 l.Error("websocket upgrade failed", "err", err) 24 w.WriteHeader(http.StatusInternalServerError) 25 return 26 } 27 defer conn.Close() 28 l.Info("upgraded http to wss") 29 30 ch := h.n.Subscribe() 31 defer h.n.Unsubscribe(ch) 32 33 ctx, cancel := context.WithCancel(r.Context()) 34 defer cancel() 35 go func() { 36 for { 37 if _, _, err := conn.NextReader(); err != nil { 38 l.Error("failed to read", "err", err) 39 cancel() 40 return 41 } 42 } 43 }() 44 45 cursor := r.URL.Query().Get("cursor") 46 47 // complete backfill first before going to live data 48 l.Info("going through backfill", "cursor", cursor) 49 if err := h.streamOps(conn, &cursor); err != nil { 50 l.Error("failed to backfill", "err", err) 51 return 52 } 53 54 for { 55 // wait for new data or timeout 56 select { 57 case <-ctx.Done(): 58 l.Info("stopping stream: client closed connection") 59 return 60 case <-ch: 61 // we have been notified of new data 62 l.Info("going through live data", "cursor", cursor) 63 if err := h.streamOps(conn, &cursor); err != nil { 64 l.Error("failed to stream", "err", err) 65 return 66 } 67 case <-time.After(30 * time.Second): 68 // send a keep-alive 69 l.Info("sent keepalive") 70 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 71 l.Error("failed to write control", "err", err) 72 } 73 } 74 } 75} 76 77func (h *Handle) streamOps(conn *websocket.Conn, cursor *string) error { 78 events, err := h.db.GetEvents(*cursor) 79 if err != nil { 80 h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor) 81 return err 82 } 83 h.l.Debug("ops", "ops", events) 84 85 for _, event := range events { 86 // first extract the inner json into a map 87 var eventJson map[string]any 88 err := json.Unmarshal([]byte(event.EventJson), &eventJson) 89 if err != nil { 90 h.l.Error("failed to unmarshal event", "err", err) 91 return err 92 } 93 94 jsonMsg, err := json.Marshal(map[string]any{ 95 "rkey": event.Rkey, 96 "nsid": event.Nsid, 97 "event": eventJson, 98 }) 99 if err != nil { 100 h.l.Error("failed to marshal record", "err", err) 101 return err 102 } 103 104 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil { 105 h.l.Debug("err", "err", err) 106 return err 107 } 108 *cursor = event.Rkey 109 } 110 111 return nil 112}