An atproto PDS written in Go
at 0.3.4 2.2 kB view raw
1package server 2 3import ( 4 "fmt" 5 "net/http" 6 7 "github.com/bluesky-social/indigo/events" 8 "github.com/bluesky-social/indigo/lex/util" 9 "github.com/btcsuite/websocket" 10 "github.com/labstack/echo/v4" 11) 12 13var upgrader = websocket.Upgrader{ 14 ReadBufferSize: 1024, 15 WriteBufferSize: 1024, 16 CheckOrigin: func(r *http.Request) bool { 17 return true 18 }, 19} 20 21func (s *Server) handleSyncSubscribeRepos(e echo.Context) error { 22 conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10) 23 if err != nil { 24 return err 25 } 26 27 s.logger.Info("new connection", "ua", e.Request().UserAgent()) 28 29 ctx := e.Request().Context() 30 31 ident := e.RealIP() + "-" + e.Request().UserAgent() 32 33 evts, cancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { 34 return true 35 }, nil) 36 if err != nil { 37 return err 38 } 39 defer cancel() 40 41 header := events.EventHeader{Op: events.EvtKindMessage} 42 for evt := range evts { 43 wc, err := conn.NextWriter(websocket.BinaryMessage) 44 if err != nil { 45 return err 46 } 47 48 var obj util.CBOR 49 50 switch { 51 case evt.Error != nil: 52 header.Op = events.EvtKindErrorFrame 53 obj = evt.Error 54 case evt.RepoCommit != nil: 55 header.MsgType = "#commit" 56 obj = evt.RepoCommit 57 case evt.RepoHandle != nil: 58 header.MsgType = "#handle" 59 obj = evt.RepoHandle 60 case evt.RepoIdentity != nil: 61 header.MsgType = "#identity" 62 obj = evt.RepoIdentity 63 case evt.RepoAccount != nil: 64 header.MsgType = "#account" 65 obj = evt.RepoAccount 66 case evt.RepoInfo != nil: 67 header.MsgType = "#info" 68 obj = evt.RepoInfo 69 case evt.RepoMigrate != nil: 70 header.MsgType = "#migrate" 71 obj = evt.RepoMigrate 72 case evt.RepoTombstone != nil: 73 header.MsgType = "#tombstone" 74 obj = evt.RepoTombstone 75 default: 76 return fmt.Errorf("unrecognized event kind") 77 } 78 79 if err := header.MarshalCBOR(wc); err != nil { 80 return fmt.Errorf("failed to write header: %w", err) 81 } 82 83 if err := obj.MarshalCBOR(wc); err != nil { 84 return fmt.Errorf("failed to write event: %w", err) 85 } 86 87 if err := wc.Close(); err != nil { 88 return fmt.Errorf("failed to flush-close our event write: %w", err) 89 } 90 } 91 92 return nil 93}