forked from tangled.org/core
this repo has no description
1package knotserver 2 3import ( 4 "context" 5 "net/http" 6 "time" 7 8 "github.com/gorilla/websocket" 9) 10 11var upgrader = websocket.Upgrader{ 12 ReadBufferSize: 1024, 13 WriteBufferSize: 1024, 14} 15 16func (h *Handle) OpLog(w http.ResponseWriter, r *http.Request) { 17 l := h.l.With("handler", "OpLog") 18 l.Info("received new connection") 19 20 conn, err := upgrader.Upgrade(w, r, nil) 21 if err != nil { 22 l.Error("websocket upgrade failed", "err", err) 23 w.WriteHeader(http.StatusInternalServerError) 24 return 25 } 26 defer conn.Close() 27 l.Info("upgraded http to wss") 28 29 ch := h.n.Subscribe() 30 defer h.n.Unsubscribe(ch) 31 32 ctx, cancel := context.WithCancel(r.Context()) 33 defer cancel() 34 go func() { 35 for { 36 if _, _, err := conn.NextReader(); err != nil { 37 l.Error("failed to read", "err", err) 38 cancel() 39 return 40 } 41 } 42 }() 43 44 cursor := "" 45 46 // complete backfill first before going to live data 47 l.Info("going through backfill", "cursor", cursor) 48 if err := h.streamOps(conn, &cursor); err != nil { 49 l.Error("failed to backfill", "err", err) 50 return 51 } 52 53 for { 54 // wait for new data or timeout 55 select { 56 case <-ctx.Done(): 57 l.Info("stopping stream: client closed connection") 58 return 59 case <-ch: 60 // we have been notified of new data 61 l.Info("going through live data", "cursor", cursor) 62 if err := h.streamOps(conn, &cursor); err != nil { 63 l.Error("failed to stream", "err", err) 64 return 65 } 66 case <-time.After(30 * time.Second): 67 // send a keep-alive 68 l.Info("sent keepalive") 69 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 70 l.Error("failed to write control", "err", err) 71 } 72 } 73 } 74} 75 76func (h *Handle) streamOps(conn *websocket.Conn, cursor *string) error { 77 ops, err := h.db.GetOps(*cursor) 78 if err != nil { 79 h.l.Debug("err", "err", err) 80 return err 81 } 82 h.l.Debug("ops", "ops", ops) 83 84 for _, op := range ops { 85 if err := conn.WriteJSON(op); err != nil { 86 h.l.Debug("err", "err", err) 87 return err 88 } 89 *cursor = op.Tid 90 } 91 92 return nil 93}