forked from tangled.org/core
this repo has no description
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "net/http" 7 "strconv" 8 "time" 9 10 "github.com/gorilla/websocket" 11) 12 13var upgrader = websocket.Upgrader{ 14 ReadBufferSize: 1024, 15 WriteBufferSize: 1024, 16} 17 18func (h *Handle) Events(w http.ResponseWriter, r *http.Request) { 19 l := h.l.With("handler", "OpLog") 20 l.Debug("received new connection") 21 22 conn, err := upgrader.Upgrade(w, r, nil) 23 if err != nil { 24 l.Error("websocket upgrade failed", "err", err) 25 w.WriteHeader(http.StatusInternalServerError) 26 return 27 } 28 defer conn.Close() 29 l.Debug("upgraded http to wss") 30 31 ch := h.n.Subscribe() 32 defer h.n.Unsubscribe(ch) 33 34 ctx, cancel := context.WithCancel(r.Context()) 35 defer cancel() 36 go func() { 37 for { 38 if _, _, err := conn.NextReader(); err != nil { 39 l.Error("failed to read", "err", err) 40 cancel() 41 return 42 } 43 } 44 }() 45 46 cursorStr := r.URL.Query().Get("cursor") 47 cursor, err := strconv.ParseInt(cursorStr, 10, 64) 48 if err != nil { 49 l.Error("empty or invalid cursor, defaulting to zero", "invalidCursor", cursorStr) 50 } 51 52 // complete backfill first before going to live data 53 l.Info("going through backfill", "cursor", cursor) 54 l.Debug("going through backfill", "cursor", cursor) 55 if err := h.streamOps(conn, &cursor); err != nil { 56 l.Error("failed to backfill", "err", err) 57 return 58 } 59 for { 60 // wait for new data or timeout 61 select { 62 case <-ctx.Done(): 63 l.Debug("stopping stream: client closed connection") 64 return 65 case <-ch: 66 // we have been notified of new data 67 l.Debug("going through live data", "cursor", cursor) 68 if err := h.streamOps(conn, &cursor); err != nil { 69 l.Error("failed to stream", "err", err) 70 return 71 } 72 case <-time.After(30 * time.Second): 73 // send a keep-alive 74 l.Debug("sent keepalive") 75 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 76 l.Error("failed to write control", "err", err) 77 } 78 } 79 } 80} 81 82func (h *Handle) streamOps(conn *websocket.Conn, cursor *int64) error { 83 events, err := h.db.GetEvents(*cursor) 84 if err != nil { 85 h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor) 86 return err 87 } 88 h.l.Debug("ops", "ops", events) 89 90 for _, event := range events { 91 // first extract the inner json into a map 92 var eventJson map[string]any 93 err := json.Unmarshal([]byte(event.EventJson), &eventJson) 94 if err != nil { 95 h.l.Error("failed to unmarshal event", "err", err) 96 return err 97 } 98 99 jsonMsg, err := json.Marshal(map[string]any{ 100 "rkey": event.Rkey, 101 "nsid": event.Nsid, 102 "event": eventJson, 103 }) 104 if err != nil { 105 h.l.Error("failed to marshal record", "err", err) 106 return err 107 } 108 109 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil { 110 h.l.Debug("err", "err", err) 111 return err 112 } 113 *cursor = event.Created 114 } 115 116 return nil 117}