An atproto PDS written in Go
at main 23 kB view raw
1package server 2 3import ( 4 "bytes" 5 "context" 6 "crypto/ecdsa" 7 "embed" 8 "errors" 9 "fmt" 10 "io" 11 "log/slog" 12 "net/http" 13 "net/smtp" 14 "os" 15 "path/filepath" 16 "sync" 17 "text/template" 18 "time" 19 20 "github.com/aws/aws-sdk-go/aws" 21 "github.com/aws/aws-sdk-go/aws/credentials" 22 "github.com/aws/aws-sdk-go/aws/session" 23 "github.com/aws/aws-sdk-go/service/s3" 24 "github.com/bluesky-social/indigo/api/atproto" 25 "github.com/bluesky-social/indigo/atproto/syntax" 26 "github.com/bluesky-social/indigo/events" 27 "github.com/bluesky-social/indigo/util" 28 "github.com/bluesky-social/indigo/xrpc" 29 "github.com/domodwyer/mailyak/v3" 30 "github.com/go-playground/validator" 31 "github.com/gorilla/sessions" 32 "github.com/haileyok/cocoon/identity" 33 "github.com/haileyok/cocoon/internal/db" 34 "github.com/haileyok/cocoon/internal/helpers" 35 "github.com/haileyok/cocoon/models" 36 "github.com/haileyok/cocoon/oauth/client" 37 "github.com/haileyok/cocoon/oauth/constants" 38 "github.com/haileyok/cocoon/oauth/dpop" 39 "github.com/haileyok/cocoon/oauth/provider" 40 "github.com/haileyok/cocoon/plc" 41 "github.com/ipfs/go-cid" 42 echo_session "github.com/labstack/echo-contrib/session" 43 "github.com/labstack/echo/v4" 44 "github.com/labstack/echo/v4/middleware" 45 slogecho "github.com/samber/slog-echo" 46 "gorm.io/driver/postgres" 47 "gorm.io/driver/sqlite" 48 "gorm.io/gorm" 49) 50 51const ( 52 AccountSessionMaxAge = 30 * 24 * time.Hour // one week 53) 54 55type S3Config struct { 56 BackupsEnabled bool 57 BlobstoreEnabled bool 58 Endpoint string 59 Region string 60 Bucket string 61 AccessKey string 62 SecretKey string 63 CDNUrl string 64} 65 66type Server struct { 67 http *http.Client 68 httpd *http.Server 69 mail *mailyak.MailYak 70 mailLk *sync.Mutex 71 echo *echo.Echo 72 db *db.DB 73 plcClient *plc.Client 74 logger *slog.Logger 75 config *config 76 privateKey *ecdsa.PrivateKey 77 repoman *RepoMan 78 oauthProvider *provider.Provider 79 evtman *events.EventManager 80 passport *identity.Passport 81 fallbackProxy string 82 83 lastRequestCrawl time.Time 84 requestCrawlMu sync.Mutex 85 86 dbName string 87 dbType string 88 s3Config *S3Config 89} 90 91type Args struct { 92 Addr string 93 DbName string 94 DbType string 95 DatabaseURL string 96 Logger *slog.Logger 97 Version string 98 Did string 99 Hostname string 100 RotationKeyPath string 101 JwkPath string 102 ContactEmail string 103 Relays []string 104 AdminPassword string 105 RequireInvite bool 106 107 SmtpUser string 108 SmtpPass string 109 SmtpHost string 110 SmtpPort string 111 SmtpEmail string 112 SmtpName string 113 114 S3Config *S3Config 115 116 SessionSecret string 117 118 BlockstoreVariant BlockstoreVariant 119 FallbackProxy string 120} 121 122type config struct { 123 Version string 124 Did string 125 Hostname string 126 ContactEmail string 127 EnforcePeering bool 128 Relays []string 129 AdminPassword string 130 RequireInvite bool 131 SmtpEmail string 132 SmtpName string 133 BlockstoreVariant BlockstoreVariant 134 FallbackProxy string 135} 136 137type CustomValidator struct { 138 validator *validator.Validate 139} 140 141type ValidationError struct { 142 error 143 Field string 144 Tag string 145} 146 147func (cv *CustomValidator) Validate(i any) error { 148 if err := cv.validator.Struct(i); err != nil { 149 var validateErrors validator.ValidationErrors 150 if errors.As(err, &validateErrors) && len(validateErrors) > 0 { 151 first := validateErrors[0] 152 return ValidationError{ 153 error: err, 154 Field: first.Field(), 155 Tag: first.Tag(), 156 } 157 } 158 159 return err 160 } 161 162 return nil 163} 164 165//go:embed templates/* 166var templateFS embed.FS 167 168//go:embed static/* 169var staticFS embed.FS 170 171type TemplateRenderer struct { 172 templates *template.Template 173 isDev bool 174 templatePath string 175} 176 177func (s *Server) loadTemplates() { 178 absPath, _ := filepath.Abs("server/templates/*.html") 179 if s.config.Version == "dev" { 180 tmpl := template.Must(template.ParseGlob(absPath)) 181 s.echo.Renderer = &TemplateRenderer{ 182 templates: tmpl, 183 isDev: true, 184 templatePath: absPath, 185 } 186 } else { 187 tmpl := template.Must(template.ParseFS(templateFS, "templates/*.html")) 188 s.echo.Renderer = &TemplateRenderer{ 189 templates: tmpl, 190 isDev: false, 191 } 192 } 193} 194 195func (t *TemplateRenderer) Render(w io.Writer, name string, data any, c echo.Context) error { 196 if t.isDev { 197 tmpl, err := template.ParseGlob(t.templatePath) 198 if err != nil { 199 return err 200 } 201 t.templates = tmpl 202 } 203 204 if viewContext, isMap := data.(map[string]any); isMap { 205 viewContext["reverse"] = c.Echo().Reverse 206 } 207 208 return t.templates.ExecuteTemplate(w, name, data) 209} 210 211func New(args *Args) (*Server, error) { 212 if args.Addr == "" { 213 return nil, fmt.Errorf("addr must be set") 214 } 215 216 if args.DbName == "" { 217 return nil, fmt.Errorf("db name must be set") 218 } 219 220 if args.Did == "" { 221 return nil, fmt.Errorf("cocoon did must be set") 222 } 223 224 if args.ContactEmail == "" { 225 return nil, fmt.Errorf("cocoon contact email is required") 226 } 227 228 if _, err := syntax.ParseDID(args.Did); err != nil { 229 return nil, fmt.Errorf("error parsing cocoon did: %w", err) 230 } 231 232 if args.Hostname == "" { 233 return nil, fmt.Errorf("cocoon hostname must be set") 234 } 235 236 if args.AdminPassword == "" { 237 return nil, fmt.Errorf("admin password must be set") 238 } 239 240 if args.Logger == nil { 241 args.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{})) 242 } 243 244 if args.SessionSecret == "" { 245 panic("SESSION SECRET WAS NOT SET. THIS IS REQUIRED. ") 246 } 247 248 e := echo.New() 249 250 e.Pre(middleware.RemoveTrailingSlash()) 251 e.Pre(slogecho.New(args.Logger)) 252 e.Use(echo_session.Middleware(sessions.NewCookieStore([]byte(args.SessionSecret)))) 253 e.Use(middleware.CORSWithConfig(middleware.CORSConfig{ 254 AllowOrigins: []string{"*"}, 255 AllowHeaders: []string{"*"}, 256 AllowMethods: []string{"*"}, 257 AllowCredentials: true, 258 MaxAge: 100_000_000, 259 })) 260 261 vdtor := validator.New() 262 vdtor.RegisterValidation("atproto-handle", func(fl validator.FieldLevel) bool { 263 if _, err := syntax.ParseHandle(fl.Field().String()); err != nil { 264 return false 265 } 266 return true 267 }) 268 vdtor.RegisterValidation("atproto-did", func(fl validator.FieldLevel) bool { 269 if _, err := syntax.ParseDID(fl.Field().String()); err != nil { 270 return false 271 } 272 return true 273 }) 274 vdtor.RegisterValidation("atproto-rkey", func(fl validator.FieldLevel) bool { 275 if _, err := syntax.ParseRecordKey(fl.Field().String()); err != nil { 276 return false 277 } 278 return true 279 }) 280 vdtor.RegisterValidation("atproto-nsid", func(fl validator.FieldLevel) bool { 281 if _, err := syntax.ParseNSID(fl.Field().String()); err != nil { 282 return false 283 } 284 return true 285 }) 286 287 e.Validator = &CustomValidator{validator: vdtor} 288 289 httpd := &http.Server{ 290 Addr: args.Addr, 291 Handler: e, 292 // shitty defaults but okay for now, needed for import repo 293 ReadTimeout: 5 * time.Minute, 294 WriteTimeout: 5 * time.Minute, 295 IdleTimeout: 5 * time.Minute, 296 } 297 298 dbType := args.DbType 299 if dbType == "" { 300 dbType = "sqlite" 301 } 302 303 var gdb *gorm.DB 304 var err error 305 switch dbType { 306 case "postgres": 307 if args.DatabaseURL == "" { 308 return nil, fmt.Errorf("database-url must be set when using postgres") 309 } 310 gdb, err = gorm.Open(postgres.Open(args.DatabaseURL), &gorm.Config{}) 311 if err != nil { 312 return nil, fmt.Errorf("failed to connect to postgres: %w", err) 313 } 314 args.Logger.Info("connected to PostgreSQL database") 315 default: 316 gdb, err = gorm.Open(sqlite.Open(args.DbName), &gorm.Config{}) 317 if err != nil { 318 return nil, fmt.Errorf("failed to open sqlite database: %w", err) 319 } 320 args.Logger.Info("connected to SQLite database", "path", args.DbName) 321 } 322 dbw := db.NewDB(gdb) 323 324 rkbytes, err := os.ReadFile(args.RotationKeyPath) 325 if err != nil { 326 return nil, err 327 } 328 329 h := util.RobustHTTPClient() 330 331 plcClient, err := plc.NewClient(&plc.ClientArgs{ 332 H: h, 333 Service: "https://plc.directory", 334 PdsHostname: args.Hostname, 335 RotationKey: rkbytes, 336 }) 337 if err != nil { 338 return nil, err 339 } 340 341 jwkbytes, err := os.ReadFile(args.JwkPath) 342 if err != nil { 343 return nil, err 344 } 345 346 key, err := helpers.ParseJWKFromBytes(jwkbytes) 347 if err != nil { 348 return nil, err 349 } 350 351 var pkey ecdsa.PrivateKey 352 if err := key.Raw(&pkey); err != nil { 353 return nil, err 354 } 355 356 oauthCli := &http.Client{ 357 Timeout: 10 * time.Second, 358 } 359 360 var nonceSecret []byte 361 maybeSecret, err := os.ReadFile("nonce.secret") 362 if err != nil && !os.IsNotExist(err) { 363 args.Logger.Error("error attempting to read nonce secret", "error", err) 364 } else { 365 nonceSecret = maybeSecret 366 } 367 368 s := &Server{ 369 http: h, 370 httpd: httpd, 371 echo: e, 372 logger: args.Logger, 373 db: dbw, 374 plcClient: plcClient, 375 privateKey: &pkey, 376 config: &config{ 377 Version: args.Version, 378 Did: args.Did, 379 Hostname: args.Hostname, 380 ContactEmail: args.ContactEmail, 381 EnforcePeering: false, 382 Relays: args.Relays, 383 AdminPassword: args.AdminPassword, 384 RequireInvite: args.RequireInvite, 385 SmtpName: args.SmtpName, 386 SmtpEmail: args.SmtpEmail, 387 BlockstoreVariant: args.BlockstoreVariant, 388 FallbackProxy: args.FallbackProxy, 389 }, 390 evtman: events.NewEventManager(events.NewMemPersister()), 391 passport: identity.NewPassport(h, identity.NewMemCache(10_000)), 392 393 dbName: args.DbName, 394 dbType: dbType, 395 s3Config: args.S3Config, 396 397 oauthProvider: provider.NewProvider(provider.Args{ 398 Hostname: args.Hostname, 399 ClientManagerArgs: client.ManagerArgs{ 400 Cli: oauthCli, 401 Logger: args.Logger, 402 }, 403 DpopManagerArgs: dpop.ManagerArgs{ 404 NonceSecret: nonceSecret, 405 NonceRotationInterval: constants.NonceMaxRotationInterval / 3, 406 OnNonceSecretCreated: func(newNonce []byte) { 407 if err := os.WriteFile("nonce.secret", newNonce, 0644); err != nil { 408 args.Logger.Error("error writing new nonce secret", "error", err) 409 } 410 }, 411 Logger: args.Logger, 412 Hostname: args.Hostname, 413 }, 414 }), 415 } 416 417 s.loadTemplates() 418 419 s.repoman = NewRepoMan(s) // TODO: this is way too lazy, stop it 420 421 // TODO: should validate these args 422 if args.SmtpUser == "" || args.SmtpPass == "" || args.SmtpHost == "" || args.SmtpPort == "" || args.SmtpEmail == "" || args.SmtpName == "" { 423 args.Logger.Warn("not enough smtp args were provided. mailing will not work for your server.") 424 } else { 425 mail := mailyak.New(args.SmtpHost+":"+args.SmtpPort, smtp.PlainAuth("", args.SmtpUser, args.SmtpPass, args.SmtpHost)) 426 mail.From(s.config.SmtpEmail) 427 mail.FromName(s.config.SmtpName) 428 429 s.mail = mail 430 s.mailLk = &sync.Mutex{} 431 } 432 433 return s, nil 434} 435 436func (s *Server) addRoutes() { 437 // static 438 if s.config.Version == "dev" { 439 s.echo.Static("/static", "server/static") 440 } else { 441 s.echo.GET("/static/*", echo.WrapHandler(http.FileServer(http.FS(staticFS)))) 442 } 443 444 // random stuff 445 s.echo.GET("/", s.handleRoot) 446 s.echo.GET("/xrpc/_health", s.handleHealth) 447 s.echo.GET("/.well-known/did.json", s.handleWellKnown) 448 s.echo.GET("/.well-known/atproto-did", s.handleAtprotoDid) 449 s.echo.GET("/.well-known/oauth-protected-resource", s.handleOauthProtectedResource) 450 s.echo.GET("/.well-known/oauth-authorization-server", s.handleOauthAuthorizationServer) 451 s.echo.GET("/robots.txt", s.handleRobots) 452 453 // public 454 s.echo.GET("/xrpc/com.atproto.identity.resolveHandle", s.handleResolveHandle) 455 s.echo.POST("/xrpc/com.atproto.server.createAccount", s.handleCreateAccount) 456 s.echo.POST("/xrpc/com.atproto.server.createSession", s.handleCreateSession) 457 s.echo.GET("/xrpc/com.atproto.server.describeServer", s.handleDescribeServer) 458 s.echo.POST("/xrpc/com.atproto.server.reserveSigningKey", s.handleServerReserveSigningKey) 459 460 s.echo.GET("/xrpc/com.atproto.repo.describeRepo", s.handleDescribeRepo) 461 s.echo.GET("/xrpc/com.atproto.sync.listRepos", s.handleListRepos) 462 s.echo.GET("/xrpc/com.atproto.repo.listRecords", s.handleListRecords) 463 s.echo.GET("/xrpc/com.atproto.repo.getRecord", s.handleRepoGetRecord) 464 s.echo.GET("/xrpc/com.atproto.sync.getRecord", s.handleSyncGetRecord) 465 s.echo.GET("/xrpc/com.atproto.sync.getBlocks", s.handleGetBlocks) 466 s.echo.GET("/xrpc/com.atproto.sync.getLatestCommit", s.handleSyncGetLatestCommit) 467 s.echo.GET("/xrpc/com.atproto.sync.getRepoStatus", s.handleSyncGetRepoStatus) 468 s.echo.GET("/xrpc/com.atproto.sync.getRepo", s.handleSyncGetRepo) 469 s.echo.GET("/xrpc/com.atproto.sync.subscribeRepos", s.handleSyncSubscribeRepos) 470 s.echo.GET("/xrpc/com.atproto.sync.listBlobs", s.handleSyncListBlobs) 471 s.echo.GET("/xrpc/com.atproto.sync.getBlob", s.handleSyncGetBlob) 472 473 // labels 474 s.echo.GET("/xrpc/com.atproto.label.queryLabels", s.handleLabelQueryLabels) 475 476 // account 477 s.echo.GET("/account", s.handleAccount) 478 s.echo.POST("/account/revoke", s.handleAccountRevoke) 479 s.echo.GET("/account/signin", s.handleAccountSigninGet) 480 s.echo.POST("/account/signin", s.handleAccountSigninPost) 481 s.echo.GET("/account/signout", s.handleAccountSignout) 482 483 // oauth account 484 s.echo.GET("/oauth/jwks", s.handleOauthJwks) 485 s.echo.GET("/oauth/authorize", s.handleOauthAuthorizeGet) 486 s.echo.POST("/oauth/authorize", s.handleOauthAuthorizePost) 487 488 // oauth authorization 489 s.echo.POST("/oauth/par", s.handleOauthPar, s.oauthProvider.BaseMiddleware) 490 s.echo.POST("/oauth/token", s.handleOauthToken, s.oauthProvider.BaseMiddleware) 491 492 // authed 493 s.echo.GET("/xrpc/com.atproto.server.getSession", s.handleGetSession, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 494 s.echo.POST("/xrpc/com.atproto.server.refreshSession", s.handleRefreshSession, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 495 s.echo.POST("/xrpc/com.atproto.server.deleteSession", s.handleDeleteSession, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 496 s.echo.GET("/xrpc/com.atproto.identity.getRecommendedDidCredentials", s.handleGetRecommendedDidCredentials, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 497 s.echo.POST("/xrpc/com.atproto.identity.updateHandle", s.handleIdentityUpdateHandle, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 498 s.echo.POST("/xrpc/com.atproto.identity.requestPlcOperationSignature", s.handleIdentityRequestPlcOperationSignature, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 499 s.echo.POST("/xrpc/com.atproto.identity.signPlcOperation", s.handleSignPlcOperation, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 500 s.echo.POST("/xrpc/com.atproto.identity.submitPlcOperation", s.handleSubmitPlcOperation, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 501 s.echo.POST("/xrpc/com.atproto.server.confirmEmail", s.handleServerConfirmEmail, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 502 s.echo.POST("/xrpc/com.atproto.server.requestEmailConfirmation", s.handleServerRequestEmailConfirmation, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 503 s.echo.POST("/xrpc/com.atproto.server.requestPasswordReset", s.handleServerRequestPasswordReset) // AUTH NOT REQUIRED FOR THIS ONE 504 s.echo.POST("/xrpc/com.atproto.server.requestEmailUpdate", s.handleServerRequestEmailUpdate, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 505 s.echo.POST("/xrpc/com.atproto.server.resetPassword", s.handleServerResetPassword, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 506 s.echo.POST("/xrpc/com.atproto.server.updateEmail", s.handleServerUpdateEmail, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 507 s.echo.GET("/xrpc/com.atproto.server.getServiceAuth", s.handleServerGetServiceAuth, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 508 s.echo.GET("/xrpc/com.atproto.server.checkAccountStatus", s.handleServerCheckAccountStatus, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 509 s.echo.POST("/xrpc/com.atproto.server.deactivateAccount", s.handleServerDeactivateAccount, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 510 s.echo.POST("/xrpc/com.atproto.server.activateAccount", s.handleServerActivateAccount, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 511 s.echo.POST("/xrpc/com.atproto.server.requestAccountDelete", s.handleServerRequestAccountDelete, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 512 s.echo.POST("/xrpc/com.atproto.server.deleteAccount", s.handleServerDeleteAccount) 513 514 // repo 515 s.echo.GET("/xrpc/com.atproto.repo.listMissingBlobs", s.handleListMissingBlobs, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 516 s.echo.POST("/xrpc/com.atproto.repo.createRecord", s.handleCreateRecord, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 517 s.echo.POST("/xrpc/com.atproto.repo.putRecord", s.handlePutRecord, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 518 s.echo.POST("/xrpc/com.atproto.repo.deleteRecord", s.handleDeleteRecord, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 519 s.echo.POST("/xrpc/com.atproto.repo.applyWrites", s.handleApplyWrites, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 520 s.echo.POST("/xrpc/com.atproto.repo.uploadBlob", s.handleRepoUploadBlob, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 521 s.echo.POST("/xrpc/com.atproto.repo.importRepo", s.handleRepoImportRepo, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 522 523 // stupid silly endpoints 524 s.echo.GET("/xrpc/app.bsky.actor.getPreferences", s.handleActorGetPreferences, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 525 s.echo.POST("/xrpc/app.bsky.actor.putPreferences", s.handleActorPutPreferences, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 526 s.echo.GET("/xrpc/app.bsky.feed.getFeed", s.handleProxyBskyFeedGetFeed, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 527 528 // admin routes 529 s.echo.POST("/xrpc/com.atproto.server.createInviteCode", s.handleCreateInviteCode, s.handleAdminMiddleware) 530 s.echo.POST("/xrpc/com.atproto.server.createInviteCodes", s.handleCreateInviteCodes, s.handleAdminMiddleware) 531 532 // are there any routes that we should be allowing without auth? i dont think so but idk 533 s.echo.GET("/xrpc/*", s.handleProxy, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 534 s.echo.POST("/xrpc/*", s.handleProxy, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 535} 536 537func (s *Server) Serve(ctx context.Context) error { 538 s.addRoutes() 539 540 s.logger.Info("migrating...") 541 542 s.db.AutoMigrate( 543 &models.Actor{}, 544 &models.Repo{}, 545 &models.InviteCode{}, 546 &models.Token{}, 547 &models.RefreshToken{}, 548 &models.Block{}, 549 &models.Record{}, 550 &models.Blob{}, 551 &models.BlobPart{}, 552 &models.ReservedKey{}, 553 &provider.OauthToken{}, 554 &provider.OauthAuthorizationRequest{}, 555 ) 556 557 s.logger.Info("starting cocoon") 558 559 go func() { 560 if err := s.httpd.ListenAndServe(); err != nil { 561 panic(err) 562 } 563 }() 564 565 go s.backupRoutine() 566 567 go func() { 568 if err := s.requestCrawl(ctx); err != nil { 569 s.logger.Error("error requesting crawls", "err", err) 570 } 571 }() 572 573 <-ctx.Done() 574 575 fmt.Println("shut down") 576 577 return nil 578} 579 580func (s *Server) requestCrawl(ctx context.Context) error { 581 logger := s.logger.With("component", "request-crawl") 582 s.requestCrawlMu.Lock() 583 defer s.requestCrawlMu.Unlock() 584 585 logger.Info("requesting crawl with configured relays") 586 587 if time.Now().Sub(s.lastRequestCrawl) <= 1*time.Minute { 588 return fmt.Errorf("a crawl request has already been made within the last minute") 589 } 590 591 for _, relay := range s.config.Relays { 592 logger := logger.With("relay", relay) 593 logger.Info("requesting crawl from relay") 594 cli := xrpc.Client{Host: relay} 595 if err := atproto.SyncRequestCrawl(ctx, &cli, &atproto.SyncRequestCrawl_Input{ 596 Hostname: s.config.Hostname, 597 }); err != nil { 598 logger.Error("error requesting crawl", "err", err) 599 } else { 600 logger.Info("crawl requested successfully") 601 } 602 } 603 604 s.lastRequestCrawl = time.Now() 605 606 return nil 607} 608 609func (s *Server) doBackup() { 610 if s.dbType == "postgres" { 611 s.logger.Info("skipping S3 backup - PostgreSQL backups should be handled externally (pg_dump, managed database backups, etc.)") 612 return 613 } 614 615 start := time.Now() 616 617 s.logger.Info("beginning backup to s3...") 618 619 var buf bytes.Buffer 620 if err := func() error { 621 s.logger.Info("reading database bytes...") 622 s.db.Lock() 623 defer s.db.Unlock() 624 625 sf, err := os.Open(s.dbName) 626 if err != nil { 627 return fmt.Errorf("error opening database for backup: %w", err) 628 } 629 defer sf.Close() 630 631 if _, err := io.Copy(&buf, sf); err != nil { 632 return fmt.Errorf("error reading bytes of backup db: %w", err) 633 } 634 635 return nil 636 }(); err != nil { 637 s.logger.Error("error backing up database", "error", err) 638 return 639 } 640 641 if err := func() error { 642 s.logger.Info("sending to s3...") 643 644 currTime := time.Now().Format("2006-01-02_15-04-05") 645 key := "cocoon-backup-" + currTime + ".db" 646 647 config := &aws.Config{ 648 Region: aws.String(s.s3Config.Region), 649 Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""), 650 } 651 652 if s.s3Config.Endpoint != "" { 653 config.Endpoint = aws.String(s.s3Config.Endpoint) 654 config.S3ForcePathStyle = aws.Bool(true) 655 } 656 657 sess, err := session.NewSession(config) 658 if err != nil { 659 return err 660 } 661 662 svc := s3.New(sess) 663 664 if _, err := svc.PutObject(&s3.PutObjectInput{ 665 Bucket: aws.String(s.s3Config.Bucket), 666 Key: aws.String(key), 667 Body: bytes.NewReader(buf.Bytes()), 668 }); err != nil { 669 return fmt.Errorf("error uploading file to s3: %w", err) 670 } 671 672 s.logger.Info("finished uploading backup to s3", "key", key, "duration", time.Now().Sub(start).Seconds()) 673 674 return nil 675 }(); err != nil { 676 s.logger.Error("error uploading database backup", "error", err) 677 return 678 } 679 680 os.WriteFile("last-backup.txt", []byte(time.Now().String()), 0644) 681} 682 683func (s *Server) backupRoutine() { 684 if s.s3Config == nil || !s.s3Config.BackupsEnabled { 685 return 686 } 687 688 if s.s3Config.Region == "" { 689 s.logger.Warn("no s3 region configured but backups are enabled. backups will not run.") 690 return 691 } 692 693 if s.s3Config.Bucket == "" { 694 s.logger.Warn("no s3 bucket configured but backups are enabled. backups will not run.") 695 return 696 } 697 698 if s.s3Config.AccessKey == "" { 699 s.logger.Warn("no s3 access key configured but backups are enabled. backups will not run.") 700 return 701 } 702 703 if s.s3Config.SecretKey == "" { 704 s.logger.Warn("no s3 secret key configured but backups are enabled. backups will not run.") 705 return 706 } 707 708 shouldBackupNow := false 709 lastBackupStr, err := os.ReadFile("last-backup.txt") 710 if err != nil { 711 shouldBackupNow = true 712 } else { 713 lastBackup, err := time.Parse("2006-01-02 15:04:05.999999999 -0700 MST", string(lastBackupStr)) 714 if err != nil { 715 shouldBackupNow = true 716 } else if time.Now().Sub(lastBackup).Seconds() > 3600 { 717 shouldBackupNow = true 718 } 719 } 720 721 if shouldBackupNow { 722 go s.doBackup() 723 } 724 725 ticker := time.NewTicker(time.Hour) 726 for range ticker.C { 727 go s.doBackup() 728 } 729} 730 731func (s *Server) UpdateRepo(ctx context.Context, did string, root cid.Cid, rev string) error { 732 if err := s.db.Exec("UPDATE repos SET root = ?, rev = ? WHERE did = ?", nil, root.Bytes(), rev, did).Error; err != nil { 733 return err 734 } 735 736 return nil 737}