forked from tangled.org/core
this repo has no description
at master 2.9 kB view raw
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "net/http" 7 "strconv" 8 "time" 9 10 "github.com/gorilla/websocket" 11) 12 13var upgrader = websocket.Upgrader{ 14 ReadBufferSize: 1024, 15 WriteBufferSize: 1024, 16} 17 18func (h *Knot) Events(w http.ResponseWriter, r *http.Request) { 19 l := h.l.With("handler", "OpLog") 20 l.Debug("received new connection") 21 22 conn, err := upgrader.Upgrade(w, r, nil) 23 if err != nil { 24 l.Error("websocket upgrade failed", "err", err) 25 w.WriteHeader(http.StatusInternalServerError) 26 return 27 } 28 defer conn.Close() 29 l.Debug("upgraded http to wss") 30 31 ch := h.n.Subscribe() 32 defer h.n.Unsubscribe(ch) 33 34 ctx, cancel := context.WithCancel(r.Context()) 35 defer cancel() 36 go func() { 37 for { 38 if _, _, err := conn.NextReader(); err != nil { 39 l.Error("failed to read", "err", err) 40 cancel() 41 return 42 } 43 } 44 }() 45 46 defaultCursor := time.Now().UnixNano() 47 cursorStr := r.URL.Query().Get("cursor") 48 cursor, err := strconv.ParseInt(cursorStr, 10, 64) 49 if err != nil { 50 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor) 51 } 52 if cursor == 0 { 53 cursor = defaultCursor 54 } 55 56 // complete backfill first before going to live data 57 l.Debug("going through backfill", "cursor", cursor) 58 if err := h.streamOps(conn, &cursor); err != nil { 59 l.Error("failed to backfill", "err", err) 60 return 61 } 62 63 for { 64 // wait for new data or timeout 65 select { 66 case <-ctx.Done(): 67 l.Debug("stopping stream: client closed connection") 68 return 69 case <-ch: 70 // we have been notified of new data 71 l.Debug("going through live data", "cursor", cursor) 72 if err := h.streamOps(conn, &cursor); err != nil { 73 l.Error("failed to stream", "err", err) 74 return 75 } 76 case <-time.After(30 * time.Second): 77 // send a keep-alive 78 l.Debug("sent keepalive") 79 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 80 l.Error("failed to write control", "err", err) 81 } 82 } 83 } 84} 85 86func (h *Knot) streamOps(conn *websocket.Conn, cursor *int64) error { 87 events, err := h.db.GetEvents(*cursor) 88 if err != nil { 89 h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor) 90 return err 91 } 92 h.l.Debug("ops", "ops", events) 93 94 for _, event := range events { 95 // first extract the inner json into a map 96 var eventJson map[string]any 97 err := json.Unmarshal([]byte(event.EventJson), &eventJson) 98 if err != nil { 99 h.l.Error("failed to unmarshal event", "err", err) 100 return err 101 } 102 103 jsonMsg, err := json.Marshal(map[string]any{ 104 "rkey": event.Rkey, 105 "nsid": event.Nsid, 106 "event": eventJson, 107 }) 108 if err != nil { 109 h.l.Error("failed to marshal record", "err", err) 110 return err 111 } 112 113 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil { 114 h.l.Debug("err", "err", err) 115 return err 116 } 117 *cursor = event.Created 118 } 119 120 return nil 121}