1package server
2
3import (
4 "fmt"
5
6 "github.com/bluesky-social/indigo/events"
7 "github.com/bluesky-social/indigo/lex/util"
8 "github.com/btcsuite/websocket"
9 "github.com/labstack/echo/v4"
10)
11
12func (s *Server) handleSyncSubscribeRepos(e echo.Context) error {
13 conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10)
14 if err != nil {
15 return err
16 }
17
18 s.logger.Info("new connection", "ua", e.Request().UserAgent())
19
20 ctx := e.Request().Context()
21
22 ident := e.RealIP() + "-" + e.Request().UserAgent()
23
24 evts, cancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
25 return true
26 }, nil)
27 if err != nil {
28 return err
29 }
30 defer cancel()
31
32 header := events.EventHeader{Op: events.EvtKindMessage}
33 for evt := range evts {
34 wc, err := conn.NextWriter(websocket.BinaryMessage)
35 if err != nil {
36 return err
37 }
38
39 var obj util.CBOR
40
41 switch {
42 case evt.Error != nil:
43 header.Op = events.EvtKindErrorFrame
44 obj = evt.Error
45 case evt.RepoCommit != nil:
46 header.MsgType = "#commit"
47 obj = evt.RepoCommit
48 case evt.RepoIdentity != nil:
49 header.MsgType = "#identity"
50 obj = evt.RepoIdentity
51 case evt.RepoAccount != nil:
52 header.MsgType = "#account"
53 obj = evt.RepoAccount
54 case evt.RepoInfo != nil:
55 header.MsgType = "#info"
56 obj = evt.RepoInfo
57 default:
58 return fmt.Errorf("unrecognized event kind")
59 }
60
61 if err := header.MarshalCBOR(wc); err != nil {
62 return fmt.Errorf("failed to write header: %w", err)
63 }
64
65 if err := obj.MarshalCBOR(wc); err != nil {
66 return fmt.Errorf("failed to write event: %w", err)
67 }
68
69 if err := wc.Close(); err != nil {
70 return fmt.Errorf("failed to flush-close our event write: %w", err)
71 }
72 }
73
74 return nil
75}