An atproto PDS written in Go
1package server 2 3import ( 4 "bytes" 5 "context" 6 "crypto/ecdsa" 7 "embed" 8 "errors" 9 "fmt" 10 "io" 11 "log/slog" 12 "net/http" 13 "net/smtp" 14 "os" 15 "path/filepath" 16 "sync" 17 "text/template" 18 "time" 19 20 "github.com/aws/aws-sdk-go/aws" 21 "github.com/aws/aws-sdk-go/aws/credentials" 22 "github.com/aws/aws-sdk-go/aws/session" 23 "github.com/aws/aws-sdk-go/service/s3" 24 "github.com/bluesky-social/indigo/api/atproto" 25 "github.com/bluesky-social/indigo/atproto/syntax" 26 "github.com/bluesky-social/indigo/events" 27 "github.com/bluesky-social/indigo/util" 28 "github.com/bluesky-social/indigo/xrpc" 29 "github.com/domodwyer/mailyak/v3" 30 "github.com/go-playground/validator" 31 "github.com/gorilla/sessions" 32 "github.com/haileyok/cocoon/identity" 33 "github.com/haileyok/cocoon/internal/db" 34 "github.com/haileyok/cocoon/internal/helpers" 35 "github.com/haileyok/cocoon/models" 36 "github.com/haileyok/cocoon/oauth/client" 37 "github.com/haileyok/cocoon/oauth/constants" 38 "github.com/haileyok/cocoon/oauth/dpop" 39 "github.com/haileyok/cocoon/oauth/provider" 40 "github.com/haileyok/cocoon/plc" 41 "github.com/ipfs/go-cid" 42 echo_session "github.com/labstack/echo-contrib/session" 43 "github.com/labstack/echo/v4" 44 "github.com/labstack/echo/v4/middleware" 45 slogecho "github.com/samber/slog-echo" 46 "gorm.io/driver/postgres" 47 "gorm.io/driver/sqlite" 48 "gorm.io/gorm" 49) 50 51const ( 52 AccountSessionMaxAge = 30 * 24 * time.Hour // one week 53) 54 55type S3Config struct { 56 BackupsEnabled bool 57 BlobstoreEnabled bool 58 Endpoint string 59 Region string 60 Bucket string 61 AccessKey string 62 SecretKey string 63 CDNUrl string 64} 65 66type Server struct { 67 http *http.Client 68 httpd *http.Server 69 mail *mailyak.MailYak 70 mailLk *sync.Mutex 71 echo *echo.Echo 72 db *db.DB 73 plcClient *plc.Client 74 logger *slog.Logger 75 config *config 76 privateKey *ecdsa.PrivateKey 77 repoman *RepoMan 78 oauthProvider *provider.Provider 79 evtman *events.EventManager 80 passport *identity.Passport 81 fallbackProxy string 82 83 lastRequestCrawl time.Time 84 requestCrawlMu sync.Mutex 85 86 dbName string 87 dbType string 88 s3Config *S3Config 89} 90 91type Args struct { 92 Addr string 93 DbName string 94 DbType string 95 DatabaseURL string 96 Logger *slog.Logger 97 Version string 98 Did string 99 Hostname string 100 RotationKeyPath string 101 JwkPath string 102 ContactEmail string 103 Relays []string 104 AdminPassword string 105 106 SmtpUser string 107 SmtpPass string 108 SmtpHost string 109 SmtpPort string 110 SmtpEmail string 111 SmtpName string 112 113 S3Config *S3Config 114 115 SessionSecret string 116 117 BlockstoreVariant BlockstoreVariant 118 FallbackProxy string 119} 120 121type config struct { 122 Version string 123 Did string 124 Hostname string 125 ContactEmail string 126 EnforcePeering bool 127 Relays []string 128 AdminPassword string 129 SmtpEmail string 130 SmtpName string 131 BlockstoreVariant BlockstoreVariant 132 FallbackProxy string 133} 134 135type CustomValidator struct { 136 validator *validator.Validate 137} 138 139type ValidationError struct { 140 error 141 Field string 142 Tag string 143} 144 145func (cv *CustomValidator) Validate(i any) error { 146 if err := cv.validator.Struct(i); err != nil { 147 var validateErrors validator.ValidationErrors 148 if errors.As(err, &validateErrors) && len(validateErrors) > 0 { 149 first := validateErrors[0] 150 return ValidationError{ 151 error: err, 152 Field: first.Field(), 153 Tag: first.Tag(), 154 } 155 } 156 157 return err 158 } 159 160 return nil 161} 162 163//go:embed templates/* 164var templateFS embed.FS 165 166//go:embed static/* 167var staticFS embed.FS 168 169type TemplateRenderer struct { 170 templates *template.Template 171 isDev bool 172 templatePath string 173} 174 175func (s *Server) loadTemplates() { 176 absPath, _ := filepath.Abs("server/templates/*.html") 177 if s.config.Version == "dev" { 178 tmpl := template.Must(template.ParseGlob(absPath)) 179 s.echo.Renderer = &TemplateRenderer{ 180 templates: tmpl, 181 isDev: true, 182 templatePath: absPath, 183 } 184 } else { 185 tmpl := template.Must(template.ParseFS(templateFS, "templates/*.html")) 186 s.echo.Renderer = &TemplateRenderer{ 187 templates: tmpl, 188 isDev: false, 189 } 190 } 191} 192 193func (t *TemplateRenderer) Render(w io.Writer, name string, data any, c echo.Context) error { 194 if t.isDev { 195 tmpl, err := template.ParseGlob(t.templatePath) 196 if err != nil { 197 return err 198 } 199 t.templates = tmpl 200 } 201 202 if viewContext, isMap := data.(map[string]any); isMap { 203 viewContext["reverse"] = c.Echo().Reverse 204 } 205 206 return t.templates.ExecuteTemplate(w, name, data) 207} 208 209func New(args *Args) (*Server, error) { 210 if args.Addr == "" { 211 return nil, fmt.Errorf("addr must be set") 212 } 213 214 if args.DbName == "" { 215 return nil, fmt.Errorf("db name must be set") 216 } 217 218 if args.Did == "" { 219 return nil, fmt.Errorf("cocoon did must be set") 220 } 221 222 if args.ContactEmail == "" { 223 return nil, fmt.Errorf("cocoon contact email is required") 224 } 225 226 if _, err := syntax.ParseDID(args.Did); err != nil { 227 return nil, fmt.Errorf("error parsing cocoon did: %w", err) 228 } 229 230 if args.Hostname == "" { 231 return nil, fmt.Errorf("cocoon hostname must be set") 232 } 233 234 if args.AdminPassword == "" { 235 return nil, fmt.Errorf("admin password must be set") 236 } 237 238 if args.Logger == nil { 239 args.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{})) 240 } 241 242 if args.SessionSecret == "" { 243 panic("SESSION SECRET WAS NOT SET. THIS IS REQUIRED. ") 244 } 245 246 e := echo.New() 247 248 e.Pre(middleware.RemoveTrailingSlash()) 249 e.Pre(slogecho.New(args.Logger)) 250 e.Use(echo_session.Middleware(sessions.NewCookieStore([]byte(args.SessionSecret)))) 251 e.Use(middleware.CORSWithConfig(middleware.CORSConfig{ 252 AllowOrigins: []string{"*"}, 253 AllowHeaders: []string{"*"}, 254 AllowMethods: []string{"*"}, 255 AllowCredentials: true, 256 MaxAge: 100_000_000, 257 })) 258 259 vdtor := validator.New() 260 vdtor.RegisterValidation("atproto-handle", func(fl validator.FieldLevel) bool { 261 if _, err := syntax.ParseHandle(fl.Field().String()); err != nil { 262 return false 263 } 264 return true 265 }) 266 vdtor.RegisterValidation("atproto-did", func(fl validator.FieldLevel) bool { 267 if _, err := syntax.ParseDID(fl.Field().String()); err != nil { 268 return false 269 } 270 return true 271 }) 272 vdtor.RegisterValidation("atproto-rkey", func(fl validator.FieldLevel) bool { 273 if _, err := syntax.ParseRecordKey(fl.Field().String()); err != nil { 274 return false 275 } 276 return true 277 }) 278 vdtor.RegisterValidation("atproto-nsid", func(fl validator.FieldLevel) bool { 279 if _, err := syntax.ParseNSID(fl.Field().String()); err != nil { 280 return false 281 } 282 return true 283 }) 284 285 e.Validator = &CustomValidator{validator: vdtor} 286 287 httpd := &http.Server{ 288 Addr: args.Addr, 289 Handler: e, 290 // shitty defaults but okay for now, needed for import repo 291 ReadTimeout: 5 * time.Minute, 292 WriteTimeout: 5 * time.Minute, 293 IdleTimeout: 5 * time.Minute, 294 } 295 296 dbType := args.DbType 297 if dbType == "" { 298 dbType = "sqlite" 299 } 300 301 var gdb *gorm.DB 302 var err error 303 switch dbType { 304 case "postgres": 305 if args.DatabaseURL == "" { 306 return nil, fmt.Errorf("database-url must be set when using postgres") 307 } 308 gdb, err = gorm.Open(postgres.Open(args.DatabaseURL), &gorm.Config{}) 309 if err != nil { 310 return nil, fmt.Errorf("failed to connect to postgres: %w", err) 311 } 312 args.Logger.Info("connected to PostgreSQL database") 313 default: 314 gdb, err = gorm.Open(sqlite.Open(args.DbName), &gorm.Config{}) 315 if err != nil { 316 return nil, fmt.Errorf("failed to open sqlite database: %w", err) 317 } 318 args.Logger.Info("connected to SQLite database", "path", args.DbName) 319 } 320 dbw := db.NewDB(gdb) 321 322 rkbytes, err := os.ReadFile(args.RotationKeyPath) 323 if err != nil { 324 return nil, err 325 } 326 327 h := util.RobustHTTPClient() 328 329 plcClient, err := plc.NewClient(&plc.ClientArgs{ 330 H: h, 331 Service: "https://plc.directory", 332 PdsHostname: args.Hostname, 333 RotationKey: rkbytes, 334 }) 335 if err != nil { 336 return nil, err 337 } 338 339 jwkbytes, err := os.ReadFile(args.JwkPath) 340 if err != nil { 341 return nil, err 342 } 343 344 key, err := helpers.ParseJWKFromBytes(jwkbytes) 345 if err != nil { 346 return nil, err 347 } 348 349 var pkey ecdsa.PrivateKey 350 if err := key.Raw(&pkey); err != nil { 351 return nil, err 352 } 353 354 oauthCli := &http.Client{ 355 Timeout: 10 * time.Second, 356 } 357 358 var nonceSecret []byte 359 maybeSecret, err := os.ReadFile("nonce.secret") 360 if err != nil && !os.IsNotExist(err) { 361 args.Logger.Error("error attempting to read nonce secret", "error", err) 362 } else { 363 nonceSecret = maybeSecret 364 } 365 366 s := &Server{ 367 http: h, 368 httpd: httpd, 369 echo: e, 370 logger: args.Logger, 371 db: dbw, 372 plcClient: plcClient, 373 privateKey: &pkey, 374 config: &config{ 375 Version: args.Version, 376 Did: args.Did, 377 Hostname: args.Hostname, 378 ContactEmail: args.ContactEmail, 379 EnforcePeering: false, 380 Relays: args.Relays, 381 AdminPassword: args.AdminPassword, 382 SmtpName: args.SmtpName, 383 SmtpEmail: args.SmtpEmail, 384 BlockstoreVariant: args.BlockstoreVariant, 385 FallbackProxy: args.FallbackProxy, 386 }, 387 evtman: events.NewEventManager(events.NewMemPersister()), 388 passport: identity.NewPassport(h, identity.NewMemCache(10_000)), 389 390 dbName: args.DbName, 391 dbType: dbType, 392 s3Config: args.S3Config, 393 394 oauthProvider: provider.NewProvider(provider.Args{ 395 Hostname: args.Hostname, 396 ClientManagerArgs: client.ManagerArgs{ 397 Cli: oauthCli, 398 Logger: args.Logger, 399 }, 400 DpopManagerArgs: dpop.ManagerArgs{ 401 NonceSecret: nonceSecret, 402 NonceRotationInterval: constants.NonceMaxRotationInterval / 3, 403 OnNonceSecretCreated: func(newNonce []byte) { 404 if err := os.WriteFile("nonce.secret", newNonce, 0644); err != nil { 405 args.Logger.Error("error writing new nonce secret", "error", err) 406 } 407 }, 408 Logger: args.Logger, 409 Hostname: args.Hostname, 410 }, 411 }), 412 } 413 414 s.loadTemplates() 415 416 s.repoman = NewRepoMan(s) // TODO: this is way too lazy, stop it 417 418 // TODO: should validate these args 419 if args.SmtpUser == "" || args.SmtpPass == "" || args.SmtpHost == "" || args.SmtpPort == "" || args.SmtpEmail == "" || args.SmtpName == "" { 420 args.Logger.Warn("not enough smtp args were provided. mailing will not work for your server.") 421 } else { 422 mail := mailyak.New(args.SmtpHost+":"+args.SmtpPort, smtp.PlainAuth("", args.SmtpUser, args.SmtpPass, args.SmtpHost)) 423 mail.From(s.config.SmtpEmail) 424 mail.FromName(s.config.SmtpName) 425 426 s.mail = mail 427 s.mailLk = &sync.Mutex{} 428 } 429 430 return s, nil 431} 432 433func (s *Server) addRoutes() { 434 // static 435 if s.config.Version == "dev" { 436 s.echo.Static("/static", "server/static") 437 } else { 438 s.echo.GET("/static/*", echo.WrapHandler(http.FileServer(http.FS(staticFS)))) 439 } 440 441 // random stuff 442 s.echo.GET("/", s.handleRoot) 443 s.echo.GET("/xrpc/_health", s.handleHealth) 444 s.echo.GET("/.well-known/did.json", s.handleWellKnown) 445 s.echo.GET("/.well-known/oauth-protected-resource", s.handleOauthProtectedResource) 446 s.echo.GET("/.well-known/oauth-authorization-server", s.handleOauthAuthorizationServer) 447 s.echo.GET("/robots.txt", s.handleRobots) 448 449 // public 450 s.echo.GET("/xrpc/com.atproto.identity.resolveHandle", s.handleResolveHandle) 451 s.echo.POST("/xrpc/com.atproto.server.createAccount", s.handleCreateAccount) 452 s.echo.POST("/xrpc/com.atproto.server.createSession", s.handleCreateSession) 453 s.echo.GET("/xrpc/com.atproto.server.describeServer", s.handleDescribeServer) 454 s.echo.POST("/xrpc/com.atproto.server.reserveSigningKey", s.handleServerReserveSigningKey) 455 456 s.echo.GET("/xrpc/com.atproto.repo.describeRepo", s.handleDescribeRepo) 457 s.echo.GET("/xrpc/com.atproto.sync.listRepos", s.handleListRepos) 458 s.echo.GET("/xrpc/com.atproto.repo.listRecords", s.handleListRecords) 459 s.echo.GET("/xrpc/com.atproto.repo.getRecord", s.handleRepoGetRecord) 460 s.echo.GET("/xrpc/com.atproto.sync.getRecord", s.handleSyncGetRecord) 461 s.echo.GET("/xrpc/com.atproto.sync.getBlocks", s.handleGetBlocks) 462 s.echo.GET("/xrpc/com.atproto.sync.getLatestCommit", s.handleSyncGetLatestCommit) 463 s.echo.GET("/xrpc/com.atproto.sync.getRepoStatus", s.handleSyncGetRepoStatus) 464 s.echo.GET("/xrpc/com.atproto.sync.getRepo", s.handleSyncGetRepo) 465 s.echo.GET("/xrpc/com.atproto.sync.subscribeRepos", s.handleSyncSubscribeRepos) 466 s.echo.GET("/xrpc/com.atproto.sync.listBlobs", s.handleSyncListBlobs) 467 s.echo.GET("/xrpc/com.atproto.sync.getBlob", s.handleSyncGetBlob) 468 469 // account 470 s.echo.GET("/account", s.handleAccount) 471 s.echo.POST("/account/revoke", s.handleAccountRevoke) 472 s.echo.GET("/account/signin", s.handleAccountSigninGet) 473 s.echo.POST("/account/signin", s.handleAccountSigninPost) 474 s.echo.GET("/account/signout", s.handleAccountSignout) 475 476 // oauth account 477 s.echo.GET("/oauth/jwks", s.handleOauthJwks) 478 s.echo.GET("/oauth/authorize", s.handleOauthAuthorizeGet) 479 s.echo.POST("/oauth/authorize", s.handleOauthAuthorizePost) 480 481 // oauth authorization 482 s.echo.POST("/oauth/par", s.handleOauthPar, s.oauthProvider.BaseMiddleware) 483 s.echo.POST("/oauth/token", s.handleOauthToken, s.oauthProvider.BaseMiddleware) 484 485 // authed 486 s.echo.GET("/xrpc/com.atproto.server.getSession", s.handleGetSession, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 487 s.echo.POST("/xrpc/com.atproto.server.refreshSession", s.handleRefreshSession, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 488 s.echo.POST("/xrpc/com.atproto.server.deleteSession", s.handleDeleteSession, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 489 s.echo.GET("/xrpc/com.atproto.identity.getRecommendedDidCredentials", s.handleGetRecommendedDidCredentials, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 490 s.echo.POST("/xrpc/com.atproto.identity.updateHandle", s.handleIdentityUpdateHandle, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 491 s.echo.POST("/xrpc/com.atproto.identity.requestPlcOperationSignature", s.handleIdentityRequestPlcOperationSignature, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 492 s.echo.POST("/xrpc/com.atproto.identity.signPlcOperation", s.handleSignPlcOperation, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 493 s.echo.POST("/xrpc/com.atproto.identity.submitPlcOperation", s.handleSubmitPlcOperation, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 494 s.echo.POST("/xrpc/com.atproto.server.confirmEmail", s.handleServerConfirmEmail, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 495 s.echo.POST("/xrpc/com.atproto.server.requestEmailConfirmation", s.handleServerRequestEmailConfirmation, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 496 s.echo.POST("/xrpc/com.atproto.server.requestPasswordReset", s.handleServerRequestPasswordReset) // AUTH NOT REQUIRED FOR THIS ONE 497 s.echo.POST("/xrpc/com.atproto.server.requestEmailUpdate", s.handleServerRequestEmailUpdate, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 498 s.echo.POST("/xrpc/com.atproto.server.resetPassword", s.handleServerResetPassword, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 499 s.echo.POST("/xrpc/com.atproto.server.updateEmail", s.handleServerUpdateEmail, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 500 s.echo.GET("/xrpc/com.atproto.server.getServiceAuth", s.handleServerGetServiceAuth, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 501 s.echo.GET("/xrpc/com.atproto.server.checkAccountStatus", s.handleServerCheckAccountStatus, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 502 s.echo.POST("/xrpc/com.atproto.server.deactivateAccount", s.handleServerDeactivateAccount, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 503 s.echo.POST("/xrpc/com.atproto.server.activateAccount", s.handleServerActivateAccount, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 504 s.echo.POST("/xrpc/com.atproto.server.requestAccountDelete", s.handleServerRequestAccountDelete, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 505 s.echo.POST("/xrpc/com.atproto.server.deleteAccount", s.handleServerDeleteAccount) 506 507 // repo 508 s.echo.GET("/xrpc/com.atproto.repo.listMissingBlobs", s.handleListMissingBlobs, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 509 s.echo.POST("/xrpc/com.atproto.repo.createRecord", s.handleCreateRecord, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 510 s.echo.POST("/xrpc/com.atproto.repo.putRecord", s.handlePutRecord, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 511 s.echo.POST("/xrpc/com.atproto.repo.deleteRecord", s.handleDeleteRecord, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 512 s.echo.POST("/xrpc/com.atproto.repo.applyWrites", s.handleApplyWrites, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 513 s.echo.POST("/xrpc/com.atproto.repo.uploadBlob", s.handleRepoUploadBlob, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 514 s.echo.POST("/xrpc/com.atproto.repo.importRepo", s.handleRepoImportRepo, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 515 516 // stupid silly endpoints 517 s.echo.GET("/xrpc/app.bsky.actor.getPreferences", s.handleActorGetPreferences, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 518 s.echo.POST("/xrpc/app.bsky.actor.putPreferences", s.handleActorPutPreferences, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 519 s.echo.GET("/xrpc/app.bsky.feed.getFeed", s.handleProxyBskyFeedGetFeed, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 520 521 // admin routes 522 s.echo.POST("/xrpc/com.atproto.server.createInviteCode", s.handleCreateInviteCode, s.handleAdminMiddleware) 523 s.echo.POST("/xrpc/com.atproto.server.createInviteCodes", s.handleCreateInviteCodes, s.handleAdminMiddleware) 524 525 // are there any routes that we should be allowing without auth? i dont think so but idk 526 s.echo.GET("/xrpc/*", s.handleProxy, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 527 s.echo.POST("/xrpc/*", s.handleProxy, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 528} 529 530func (s *Server) Serve(ctx context.Context) error { 531 s.addRoutes() 532 533 s.logger.Info("migrating...") 534 535 s.db.AutoMigrate( 536 &models.Actor{}, 537 &models.Repo{}, 538 &models.InviteCode{}, 539 &models.Token{}, 540 &models.RefreshToken{}, 541 &models.Block{}, 542 &models.Record{}, 543 &models.Blob{}, 544 &models.BlobPart{}, 545 &models.ReservedKey{}, 546 &provider.OauthToken{}, 547 &provider.OauthAuthorizationRequest{}, 548 ) 549 550 s.logger.Info("starting cocoon") 551 552 go func() { 553 if err := s.httpd.ListenAndServe(); err != nil { 554 panic(err) 555 } 556 }() 557 558 go s.backupRoutine() 559 560 go func() { 561 if err := s.requestCrawl(ctx); err != nil { 562 s.logger.Error("error requesting crawls", "err", err) 563 } 564 }() 565 566 <-ctx.Done() 567 568 fmt.Println("shut down") 569 570 return nil 571} 572 573func (s *Server) requestCrawl(ctx context.Context) error { 574 logger := s.logger.With("component", "request-crawl") 575 s.requestCrawlMu.Lock() 576 defer s.requestCrawlMu.Unlock() 577 578 logger.Info("requesting crawl with configured relays") 579 580 if time.Now().Sub(s.lastRequestCrawl) <= 1*time.Minute { 581 return fmt.Errorf("a crawl request has already been made within the last minute") 582 } 583 584 for _, relay := range s.config.Relays { 585 logger := logger.With("relay", relay) 586 logger.Info("requesting crawl from relay") 587 cli := xrpc.Client{Host: relay} 588 if err := atproto.SyncRequestCrawl(ctx, &cli, &atproto.SyncRequestCrawl_Input{ 589 Hostname: s.config.Hostname, 590 }); err != nil { 591 logger.Error("error requesting crawl", "err", err) 592 } else { 593 logger.Info("crawl requested successfully") 594 } 595 } 596 597 s.lastRequestCrawl = time.Now() 598 599 return nil 600} 601 602func (s *Server) doBackup() { 603 if s.dbType == "postgres" { 604 s.logger.Info("skipping S3 backup - PostgreSQL backups should be handled externally (pg_dump, managed database backups, etc.)") 605 return 606 } 607 608 start := time.Now() 609 610 s.logger.Info("beginning backup to s3...") 611 612 var buf bytes.Buffer 613 if err := func() error { 614 s.logger.Info("reading database bytes...") 615 s.db.Lock() 616 defer s.db.Unlock() 617 618 sf, err := os.Open(s.dbName) 619 if err != nil { 620 return fmt.Errorf("error opening database for backup: %w", err) 621 } 622 defer sf.Close() 623 624 if _, err := io.Copy(&buf, sf); err != nil { 625 return fmt.Errorf("error reading bytes of backup db: %w", err) 626 } 627 628 return nil 629 }(); err != nil { 630 s.logger.Error("error backing up database", "error", err) 631 return 632 } 633 634 if err := func() error { 635 s.logger.Info("sending to s3...") 636 637 currTime := time.Now().Format("2006-01-02_15-04-05") 638 key := "cocoon-backup-" + currTime + ".db" 639 640 config := &aws.Config{ 641 Region: aws.String(s.s3Config.Region), 642 Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""), 643 } 644 645 if s.s3Config.Endpoint != "" { 646 config.Endpoint = aws.String(s.s3Config.Endpoint) 647 config.S3ForcePathStyle = aws.Bool(true) 648 } 649 650 sess, err := session.NewSession(config) 651 if err != nil { 652 return err 653 } 654 655 svc := s3.New(sess) 656 657 if _, err := svc.PutObject(&s3.PutObjectInput{ 658 Bucket: aws.String(s.s3Config.Bucket), 659 Key: aws.String(key), 660 Body: bytes.NewReader(buf.Bytes()), 661 }); err != nil { 662 return fmt.Errorf("error uploading file to s3: %w", err) 663 } 664 665 s.logger.Info("finished uploading backup to s3", "key", key, "duration", time.Now().Sub(start).Seconds()) 666 667 return nil 668 }(); err != nil { 669 s.logger.Error("error uploading database backup", "error", err) 670 return 671 } 672 673 os.WriteFile("last-backup.txt", []byte(time.Now().String()), 0644) 674} 675 676func (s *Server) backupRoutine() { 677 if s.s3Config == nil || !s.s3Config.BackupsEnabled { 678 return 679 } 680 681 if s.s3Config.Region == "" { 682 s.logger.Warn("no s3 region configured but backups are enabled. backups will not run.") 683 return 684 } 685 686 if s.s3Config.Bucket == "" { 687 s.logger.Warn("no s3 bucket configured but backups are enabled. backups will not run.") 688 return 689 } 690 691 if s.s3Config.AccessKey == "" { 692 s.logger.Warn("no s3 access key configured but backups are enabled. backups will not run.") 693 return 694 } 695 696 if s.s3Config.SecretKey == "" { 697 s.logger.Warn("no s3 secret key configured but backups are enabled. backups will not run.") 698 return 699 } 700 701 shouldBackupNow := false 702 lastBackupStr, err := os.ReadFile("last-backup.txt") 703 if err != nil { 704 shouldBackupNow = true 705 } else { 706 lastBackup, err := time.Parse("2006-01-02 15:04:05.999999999 -0700 MST", string(lastBackupStr)) 707 if err != nil { 708 shouldBackupNow = true 709 } else if time.Now().Sub(lastBackup).Seconds() > 3600 { 710 shouldBackupNow = true 711 } 712 } 713 714 if shouldBackupNow { 715 go s.doBackup() 716 } 717 718 ticker := time.NewTicker(time.Hour) 719 for range ticker.C { 720 go s.doBackup() 721 } 722} 723 724func (s *Server) UpdateRepo(ctx context.Context, did string, root cid.Cid, rev string) error { 725 if err := s.db.Exec("UPDATE repos SET root = ?, rev = ? WHERE did = ?", nil, root.Bytes(), rev, did).Error; err != nil { 726 return err 727 } 728 729 return nil 730}