An atproto PDS written in Go
1package server 2 3import ( 4 "fmt" 5 6 "github.com/bluesky-social/indigo/events" 7 "github.com/bluesky-social/indigo/lex/util" 8 "github.com/btcsuite/websocket" 9 "github.com/labstack/echo/v4" 10) 11 12func (s *Server) handleSyncSubscribeRepos(e echo.Context) error { 13 conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10) 14 if err != nil { 15 return err 16 } 17 18 s.logger.Info("new connection", "ua", e.Request().UserAgent()) 19 20 ctx := e.Request().Context() 21 22 ident := e.RealIP() + "-" + e.Request().UserAgent() 23 24 evts, cancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { 25 return true 26 }, nil) 27 if err != nil { 28 return err 29 } 30 defer cancel() 31 32 header := events.EventHeader{Op: events.EvtKindMessage} 33 for evt := range evts { 34 wc, err := conn.NextWriter(websocket.BinaryMessage) 35 if err != nil { 36 return err 37 } 38 39 var obj util.CBOR 40 41 switch { 42 case evt.Error != nil: 43 header.Op = events.EvtKindErrorFrame 44 obj = evt.Error 45 case evt.RepoCommit != nil: 46 header.MsgType = "#commit" 47 obj = evt.RepoCommit 48 case evt.RepoIdentity != nil: 49 header.MsgType = "#identity" 50 obj = evt.RepoIdentity 51 case evt.RepoAccount != nil: 52 header.MsgType = "#account" 53 obj = evt.RepoAccount 54 case evt.RepoInfo != nil: 55 header.MsgType = "#info" 56 obj = evt.RepoInfo 57 default: 58 return fmt.Errorf("unrecognized event kind") 59 } 60 61 if err := header.MarshalCBOR(wc); err != nil { 62 return fmt.Errorf("failed to write header: %w", err) 63 } 64 65 if err := obj.MarshalCBOR(wc); err != nil { 66 return fmt.Errorf("failed to write event: %w", err) 67 } 68 69 if err := wc.Close(); err != nil { 70 return fmt.Errorf("failed to flush-close our event write: %w", err) 71 } 72 } 73 74 return nil 75}