forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package spindle 2 3import ( 4 "net/http" 5 "time" 6 7 "context" 8 9 "github.com/gorilla/websocket" 10) 11 12var upgrader = websocket.Upgrader{ 13 ReadBufferSize: 1024, 14 WriteBufferSize: 1024, 15} 16 17func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) { 18 l := s.l.With("handler", "Events") 19 l.Info("received new connection") 20 21 conn, err := upgrader.Upgrade(w, r, nil) 22 if err != nil { 23 l.Error("websocket upgrade failed", "err", err) 24 w.WriteHeader(http.StatusInternalServerError) 25 return 26 } 27 defer conn.Close() 28 l.Info("upgraded http to wss") 29 30 ch := s.n.Subscribe() 31 defer s.n.Unsubscribe(ch) 32 33 ctx, cancel := context.WithCancel(r.Context()) 34 defer cancel() 35 go func() { 36 for { 37 if _, _, err := conn.NextReader(); err != nil { 38 l.Error("failed to read", "err", err) 39 cancel() 40 return 41 } 42 } 43 }() 44 45 cursor := "" 46 47 // complete backfill first before going to live data 48 l.Info("going through backfill", "cursor", cursor) 49 if err := s.streamPipelines(conn, &cursor); err != nil { 50 l.Error("failed to backfill", "err", err) 51 return 52 } 53 54 for { 55 // wait for new data or timeout 56 select { 57 case <-ctx.Done(): 58 l.Info("stopping stream: client closed connection") 59 return 60 case <-ch: 61 // we have been notified of new data 62 l.Info("going through live data", "cursor", cursor) 63 if err := s.streamPipelines(conn, &cursor); err != nil { 64 l.Error("failed to stream", "err", err) 65 return 66 } 67 case <-time.After(30 * time.Second): 68 // send a keep-alive 69 l.Info("sent keepalive") 70 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 71 l.Error("failed to write control", "err", err) 72 } 73 } 74 } 75} 76 77func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *string) error { 78 ops, err := s.db.GetPipelineStatusAsRecords(*cursor) 79 if err != nil { 80 s.l.Debug("err", "err", err) 81 return err 82 } 83 s.l.Debug("ops", "ops", ops) 84 85 for _, op := range ops { 86 if err := conn.WriteJSON(op); err != nil { 87 s.l.Debug("err", "err", err) 88 return err 89 } 90 *cursor = op.Rkey 91 } 92 93 return nil 94}