1package server
2
3import (
4 "context"
5 "time"
6
7 "github.com/bluesky-social/indigo/events"
8 "github.com/bluesky-social/indigo/lex/util"
9 "github.com/btcsuite/websocket"
10 "github.com/labstack/echo/v4"
11)
12
13func (s *Server) handleSyncSubscribeRepos(e echo.Context) error {
14 ctx := e.Request().Context()
15 logger := s.logger.With("component", "subscribe-repos-websocket")
16
17 conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10)
18 if err != nil {
19 logger.Error("unable to establish websocket with relay", "err", err)
20 return err
21 }
22
23 ident := e.RealIP() + "-" + e.Request().UserAgent()
24 logger = logger.With("ident", ident)
25 logger.Info("new connection established")
26
27 evts, cancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
28 return true
29 }, nil)
30 if err != nil {
31 return err
32 }
33 defer cancel()
34
35 header := events.EventHeader{Op: events.EvtKindMessage}
36 for evt := range evts {
37 wc, err := conn.NextWriter(websocket.BinaryMessage)
38 if err != nil {
39 logger.Error("error writing message to relay", "err", err)
40 break
41 }
42
43 if ctx.Err() != nil {
44 logger.Error("context error", "err", err)
45 break
46 }
47
48 var obj util.CBOR
49 switch {
50 case evt.Error != nil:
51 header.Op = events.EvtKindErrorFrame
52 obj = evt.Error
53 case evt.RepoCommit != nil:
54 header.MsgType = "#commit"
55 obj = evt.RepoCommit
56 case evt.RepoIdentity != nil:
57 header.MsgType = "#identity"
58 obj = evt.RepoIdentity
59 case evt.RepoAccount != nil:
60 header.MsgType = "#account"
61 obj = evt.RepoAccount
62 case evt.RepoInfo != nil:
63 header.MsgType = "#info"
64 obj = evt.RepoInfo
65 default:
66 logger.Warn("unrecognized event kind")
67 return nil
68 }
69
70 if err := header.MarshalCBOR(wc); err != nil {
71 logger.Error("failed to write header to relay", "err", err)
72 break
73 }
74
75 if err := obj.MarshalCBOR(wc); err != nil {
76 logger.Error("failed to write event to relay", "err", err)
77 break
78 }
79
80 if err := wc.Close(); err != nil {
81 logger.Error("failed to flush-close our event write", "err", err)
82 break
83 }
84 }
85
86 // we should tell the relay to request a new crawl at this point if we got disconnected
87 // use a new context since the old one might be cancelled at this point
88 ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
89 defer cancel()
90 if err := s.requestCrawl(ctx); err != nil {
91 logger.Error("error requesting crawls", "err", err)
92 }
93
94 return nil
95}