forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package spindle 2 3import ( 4 "net/http" 5 "time" 6 7 "github.com/gorilla/websocket" 8 "golang.org/x/net/context" 9) 10 11var upgrader = websocket.Upgrader{ 12 ReadBufferSize: 1024, 13 WriteBufferSize: 1024, 14} 15 16func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) { 17 l := s.l.With("handler", "Events") 18 l.Info("received new connection") 19 20 conn, err := upgrader.Upgrade(w, r, nil) 21 if err != nil { 22 l.Error("websocket upgrade failed", "err", err) 23 w.WriteHeader(http.StatusInternalServerError) 24 return 25 } 26 defer conn.Close() 27 l.Info("upgraded http to wss") 28 29 ch := s.n.Subscribe() 30 defer s.n.Unsubscribe(ch) 31 32 ctx, cancel := context.WithCancel(r.Context()) 33 defer cancel() 34 go func() { 35 for { 36 if _, _, err := conn.NextReader(); err != nil { 37 l.Error("failed to read", "err", err) 38 cancel() 39 return 40 } 41 } 42 }() 43 44 cursor := "" 45 46 // complete backfill first before going to live data 47 l.Info("going through backfill", "cursor", cursor) 48 if err := s.streamPipelines(conn, &cursor); err != nil { 49 l.Error("failed to backfill", "err", err) 50 return 51 } 52 53 for { 54 // wait for new data or timeout 55 select { 56 case <-ctx.Done(): 57 l.Info("stopping stream: client closed connection") 58 return 59 case <-ch: 60 // we have been notified of new data 61 l.Info("going through live data", "cursor", cursor) 62 if err := s.streamPipelines(conn, &cursor); err != nil { 63 l.Error("failed to stream", "err", err) 64 return 65 } 66 case <-time.After(30 * time.Second): 67 // send a keep-alive 68 l.Info("sent keepalive") 69 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 70 l.Error("failed to write control", "err", err) 71 } 72 } 73 } 74} 75 76func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *string) error { 77 ops, err := s.db.GetPipelines(*cursor) 78 if err != nil { 79 s.l.Debug("err", "err", err) 80 return err 81 } 82 s.l.Debug("ops", "ops", ops) 83 84 for _, op := range ops { 85 if err := conn.WriteJSON(op); err != nil { 86 s.l.Debug("err", "err", err) 87 return err 88 } 89 *cursor = op.Rkey 90 } 91 92 return nil 93}