1package server
2
3import (
4 "fmt"
5 "net/http"
6
7 "github.com/bluesky-social/indigo/events"
8 "github.com/bluesky-social/indigo/lex/util"
9 "github.com/btcsuite/websocket"
10 "github.com/labstack/echo/v4"
11)
12
13var upgrader = websocket.Upgrader{
14 ReadBufferSize: 1024,
15 WriteBufferSize: 1024,
16 CheckOrigin: func(r *http.Request) bool {
17 return true
18 },
19}
20
21func (s *Server) handleSyncSubscribeRepos(e echo.Context) error {
22 conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10)
23 if err != nil {
24 return err
25 }
26
27 s.logger.Info("new connection", "ua", e.Request().UserAgent())
28
29 ctx := e.Request().Context()
30
31 ident := e.RealIP() + "-" + e.Request().UserAgent()
32
33 evts, cancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
34 return true
35 }, nil)
36 if err != nil {
37 return err
38 }
39 defer cancel()
40
41 header := events.EventHeader{Op: events.EvtKindMessage}
42 for evt := range evts {
43 wc, err := conn.NextWriter(websocket.BinaryMessage)
44 if err != nil {
45 return err
46 }
47
48 var obj util.CBOR
49
50 switch {
51 case evt.Error != nil:
52 header.Op = events.EvtKindErrorFrame
53 obj = evt.Error
54 case evt.RepoCommit != nil:
55 header.MsgType = "#commit"
56 obj = evt.RepoCommit
57 case evt.RepoHandle != nil:
58 header.MsgType = "#handle"
59 obj = evt.RepoHandle
60 case evt.RepoIdentity != nil:
61 header.MsgType = "#identity"
62 obj = evt.RepoIdentity
63 case evt.RepoAccount != nil:
64 header.MsgType = "#account"
65 obj = evt.RepoAccount
66 case evt.RepoInfo != nil:
67 header.MsgType = "#info"
68 obj = evt.RepoInfo
69 case evt.RepoMigrate != nil:
70 header.MsgType = "#migrate"
71 obj = evt.RepoMigrate
72 case evt.RepoTombstone != nil:
73 header.MsgType = "#tombstone"
74 obj = evt.RepoTombstone
75 default:
76 return fmt.Errorf("unrecognized event kind")
77 }
78
79 if err := header.MarshalCBOR(wc); err != nil {
80 return fmt.Errorf("failed to write header: %w", err)
81 }
82
83 if err := obj.MarshalCBOR(wc); err != nil {
84 return fmt.Errorf("failed to write event: %w", err)
85 }
86
87 if err := wc.Close(); err != nil {
88 return fmt.Errorf("failed to flush-close our event write: %w", err)
89 }
90 }
91
92 return nil
93}