1package spindle
2
3import (
4 "net/http"
5 "time"
6
7 "context"
8
9 "github.com/gorilla/websocket"
10)
11
12var upgrader = websocket.Upgrader{
13 ReadBufferSize: 1024,
14 WriteBufferSize: 1024,
15}
16
17func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) {
18 l := s.l.With("handler", "Events")
19 l.Info("received new connection")
20
21 conn, err := upgrader.Upgrade(w, r, nil)
22 if err != nil {
23 l.Error("websocket upgrade failed", "err", err)
24 w.WriteHeader(http.StatusInternalServerError)
25 return
26 }
27 defer conn.Close()
28 l.Info("upgraded http to wss")
29
30 ch := s.n.Subscribe()
31 defer s.n.Unsubscribe(ch)
32
33 ctx, cancel := context.WithCancel(r.Context())
34 defer cancel()
35 go func() {
36 for {
37 if _, _, err := conn.NextReader(); err != nil {
38 l.Error("failed to read", "err", err)
39 cancel()
40 return
41 }
42 }
43 }()
44
45 cursor := ""
46
47 // complete backfill first before going to live data
48 l.Info("going through backfill", "cursor", cursor)
49 if err := s.streamPipelines(conn, &cursor); err != nil {
50 l.Error("failed to backfill", "err", err)
51 return
52 }
53
54 for {
55 // wait for new data or timeout
56 select {
57 case <-ctx.Done():
58 l.Info("stopping stream: client closed connection")
59 return
60 case <-ch:
61 // we have been notified of new data
62 l.Info("going through live data", "cursor", cursor)
63 if err := s.streamPipelines(conn, &cursor); err != nil {
64 l.Error("failed to stream", "err", err)
65 return
66 }
67 case <-time.After(30 * time.Second):
68 // send a keep-alive
69 l.Info("sent keepalive")
70 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
71 l.Error("failed to write control", "err", err)
72 }
73 }
74 }
75}
76
77func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *string) error {
78 ops, err := s.db.GetPipelineStatusAsRecords(*cursor)
79 if err != nil {
80 s.l.Debug("err", "err", err)
81 return err
82 }
83 s.l.Debug("ops", "ops", ops)
84
85 for _, op := range ops {
86 if err := conn.WriteJSON(op); err != nil {
87 s.l.Debug("err", "err", err)
88 return err
89 }
90 *cursor = op.Rkey
91 }
92
93 return nil
94}