An atproto PDS written in Go
at main 2.5 kB view raw
1package server 2 3import ( 4 "context" 5 "time" 6 7 "github.com/bluesky-social/indigo/events" 8 "github.com/bluesky-social/indigo/lex/util" 9 "github.com/btcsuite/websocket" 10 "github.com/labstack/echo/v4" 11) 12 13func (s *Server) handleSyncSubscribeRepos(e echo.Context) error { 14 ctx := e.Request().Context() 15 logger := s.logger.With("component", "subscribe-repos-websocket") 16 17 conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10) 18 if err != nil { 19 logger.Error("unable to establish websocket with relay", "err", err) 20 return err 21 } 22 23 ident := e.RealIP() + "-" + e.Request().UserAgent() 24 logger = logger.With("ident", ident) 25 logger.Info("new connection established") 26 27 evts, cancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { 28 return true 29 }, nil) 30 if err != nil { 31 return err 32 } 33 defer cancel() 34 35 header := events.EventHeader{Op: events.EvtKindMessage} 36 for evt := range evts { 37 wc, err := conn.NextWriter(websocket.BinaryMessage) 38 if err != nil { 39 logger.Error("error writing message to relay", "err", err) 40 break 41 } 42 43 if ctx.Err() != nil { 44 logger.Error("context error", "err", err) 45 break 46 } 47 48 var obj util.CBOR 49 switch { 50 case evt.Error != nil: 51 header.Op = events.EvtKindErrorFrame 52 obj = evt.Error 53 case evt.RepoCommit != nil: 54 header.MsgType = "#commit" 55 obj = evt.RepoCommit 56 case evt.RepoIdentity != nil: 57 header.MsgType = "#identity" 58 obj = evt.RepoIdentity 59 case evt.RepoAccount != nil: 60 header.MsgType = "#account" 61 obj = evt.RepoAccount 62 case evt.RepoInfo != nil: 63 header.MsgType = "#info" 64 obj = evt.RepoInfo 65 default: 66 logger.Warn("unrecognized event kind") 67 return nil 68 } 69 70 if err := header.MarshalCBOR(wc); err != nil { 71 logger.Error("failed to write header to relay", "err", err) 72 break 73 } 74 75 if err := obj.MarshalCBOR(wc); err != nil { 76 logger.Error("failed to write event to relay", "err", err) 77 break 78 } 79 80 if err := wc.Close(); err != nil { 81 logger.Error("failed to flush-close our event write", "err", err) 82 break 83 } 84 } 85 86 // we should tell the relay to request a new crawl at this point if we got disconnected 87 // use a new context since the old one might be cancelled at this point 88 ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) 89 defer cancel() 90 if err := s.requestCrawl(ctx); err != nil { 91 logger.Error("error requesting crawls", "err", err) 92 } 93 94 return nil 95}