1package server
2
3import (
4 "bytes"
5 "context"
6 "crypto/ecdsa"
7 "embed"
8 "errors"
9 "fmt"
10 "io"
11 "log/slog"
12 "net/http"
13 "net/smtp"
14 "os"
15 "path/filepath"
16 "sync"
17 "text/template"
18 "time"
19
20 "github.com/aws/aws-sdk-go/aws"
21 "github.com/aws/aws-sdk-go/aws/credentials"
22 "github.com/aws/aws-sdk-go/aws/session"
23 "github.com/aws/aws-sdk-go/service/s3"
24 "github.com/bluesky-social/indigo/api/atproto"
25 "github.com/bluesky-social/indigo/atproto/syntax"
26 "github.com/bluesky-social/indigo/events"
27 "github.com/bluesky-social/indigo/util"
28 "github.com/bluesky-social/indigo/xrpc"
29 "github.com/domodwyer/mailyak/v3"
30 "github.com/go-playground/validator"
31 "github.com/gorilla/sessions"
32 "github.com/haileyok/cocoon/identity"
33 "github.com/haileyok/cocoon/internal/db"
34 "github.com/haileyok/cocoon/internal/helpers"
35 "github.com/haileyok/cocoon/models"
36 "github.com/haileyok/cocoon/oauth/client"
37 "github.com/haileyok/cocoon/oauth/constants"
38 "github.com/haileyok/cocoon/oauth/dpop"
39 "github.com/haileyok/cocoon/oauth/provider"
40 "github.com/haileyok/cocoon/plc"
41 "github.com/ipfs/go-cid"
42 echo_session "github.com/labstack/echo-contrib/session"
43 "github.com/labstack/echo/v4"
44 "github.com/labstack/echo/v4/middleware"
45 slogecho "github.com/samber/slog-echo"
46 "gorm.io/driver/postgres"
47 "gorm.io/driver/sqlite"
48 "gorm.io/gorm"
49)
50
51const (
52 AccountSessionMaxAge = 30 * 24 * time.Hour // one week
53)
54
55type S3Config struct {
56 BackupsEnabled bool
57 BlobstoreEnabled bool
58 Endpoint string
59 Region string
60 Bucket string
61 AccessKey string
62 SecretKey string
63 CDNUrl string
64}
65
66type Server struct {
67 http *http.Client
68 httpd *http.Server
69 mail *mailyak.MailYak
70 mailLk *sync.Mutex
71 echo *echo.Echo
72 db *db.DB
73 plcClient *plc.Client
74 logger *slog.Logger
75 config *config
76 privateKey *ecdsa.PrivateKey
77 repoman *RepoMan
78 oauthProvider *provider.Provider
79 evtman *events.EventManager
80 passport *identity.Passport
81 fallbackProxy string
82
83 lastRequestCrawl time.Time
84 requestCrawlMu sync.Mutex
85
86 dbName string
87 dbType string
88 s3Config *S3Config
89}
90
91type Args struct {
92 Addr string
93 DbName string
94 DbType string
95 DatabaseURL string
96 Logger *slog.Logger
97 Version string
98 Did string
99 Hostname string
100 RotationKeyPath string
101 JwkPath string
102 ContactEmail string
103 Relays []string
104 AdminPassword string
105
106 SmtpUser string
107 SmtpPass string
108 SmtpHost string
109 SmtpPort string
110 SmtpEmail string
111 SmtpName string
112
113 S3Config *S3Config
114
115 SessionSecret string
116
117 BlockstoreVariant BlockstoreVariant
118 FallbackProxy string
119}
120
121type config struct {
122 Version string
123 Did string
124 Hostname string
125 ContactEmail string
126 EnforcePeering bool
127 Relays []string
128 AdminPassword string
129 SmtpEmail string
130 SmtpName string
131 BlockstoreVariant BlockstoreVariant
132 FallbackProxy string
133}
134
135type CustomValidator struct {
136 validator *validator.Validate
137}
138
139type ValidationError struct {
140 error
141 Field string
142 Tag string
143}
144
145func (cv *CustomValidator) Validate(i any) error {
146 if err := cv.validator.Struct(i); err != nil {
147 var validateErrors validator.ValidationErrors
148 if errors.As(err, &validateErrors) && len(validateErrors) > 0 {
149 first := validateErrors[0]
150 return ValidationError{
151 error: err,
152 Field: first.Field(),
153 Tag: first.Tag(),
154 }
155 }
156
157 return err
158 }
159
160 return nil
161}
162
163//go:embed templates/*
164var templateFS embed.FS
165
166//go:embed static/*
167var staticFS embed.FS
168
169type TemplateRenderer struct {
170 templates *template.Template
171 isDev bool
172 templatePath string
173}
174
175func (s *Server) loadTemplates() {
176 absPath, _ := filepath.Abs("server/templates/*.html")
177 if s.config.Version == "dev" {
178 tmpl := template.Must(template.ParseGlob(absPath))
179 s.echo.Renderer = &TemplateRenderer{
180 templates: tmpl,
181 isDev: true,
182 templatePath: absPath,
183 }
184 } else {
185 tmpl := template.Must(template.ParseFS(templateFS, "templates/*.html"))
186 s.echo.Renderer = &TemplateRenderer{
187 templates: tmpl,
188 isDev: false,
189 }
190 }
191}
192
193func (t *TemplateRenderer) Render(w io.Writer, name string, data any, c echo.Context) error {
194 if t.isDev {
195 tmpl, err := template.ParseGlob(t.templatePath)
196 if err != nil {
197 return err
198 }
199 t.templates = tmpl
200 }
201
202 if viewContext, isMap := data.(map[string]any); isMap {
203 viewContext["reverse"] = c.Echo().Reverse
204 }
205
206 return t.templates.ExecuteTemplate(w, name, data)
207}
208
209func New(args *Args) (*Server, error) {
210 if args.Addr == "" {
211 return nil, fmt.Errorf("addr must be set")
212 }
213
214 if args.DbName == "" {
215 return nil, fmt.Errorf("db name must be set")
216 }
217
218 if args.Did == "" {
219 return nil, fmt.Errorf("cocoon did must be set")
220 }
221
222 if args.ContactEmail == "" {
223 return nil, fmt.Errorf("cocoon contact email is required")
224 }
225
226 if _, err := syntax.ParseDID(args.Did); err != nil {
227 return nil, fmt.Errorf("error parsing cocoon did: %w", err)
228 }
229
230 if args.Hostname == "" {
231 return nil, fmt.Errorf("cocoon hostname must be set")
232 }
233
234 if args.AdminPassword == "" {
235 return nil, fmt.Errorf("admin password must be set")
236 }
237
238 if args.Logger == nil {
239 args.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{}))
240 }
241
242 if args.SessionSecret == "" {
243 panic("SESSION SECRET WAS NOT SET. THIS IS REQUIRED. ")
244 }
245
246 e := echo.New()
247
248 e.Pre(middleware.RemoveTrailingSlash())
249 e.Pre(slogecho.New(args.Logger))
250 e.Use(echo_session.Middleware(sessions.NewCookieStore([]byte(args.SessionSecret))))
251 e.Use(middleware.CORSWithConfig(middleware.CORSConfig{
252 AllowOrigins: []string{"*"},
253 AllowHeaders: []string{"*"},
254 AllowMethods: []string{"*"},
255 AllowCredentials: true,
256 MaxAge: 100_000_000,
257 }))
258
259 vdtor := validator.New()
260 vdtor.RegisterValidation("atproto-handle", func(fl validator.FieldLevel) bool {
261 if _, err := syntax.ParseHandle(fl.Field().String()); err != nil {
262 return false
263 }
264 return true
265 })
266 vdtor.RegisterValidation("atproto-did", func(fl validator.FieldLevel) bool {
267 if _, err := syntax.ParseDID(fl.Field().String()); err != nil {
268 return false
269 }
270 return true
271 })
272 vdtor.RegisterValidation("atproto-rkey", func(fl validator.FieldLevel) bool {
273 if _, err := syntax.ParseRecordKey(fl.Field().String()); err != nil {
274 return false
275 }
276 return true
277 })
278 vdtor.RegisterValidation("atproto-nsid", func(fl validator.FieldLevel) bool {
279 if _, err := syntax.ParseNSID(fl.Field().String()); err != nil {
280 return false
281 }
282 return true
283 })
284
285 e.Validator = &CustomValidator{validator: vdtor}
286
287 httpd := &http.Server{
288 Addr: args.Addr,
289 Handler: e,
290 // shitty defaults but okay for now, needed for import repo
291 ReadTimeout: 5 * time.Minute,
292 WriteTimeout: 5 * time.Minute,
293 IdleTimeout: 5 * time.Minute,
294 }
295
296 dbType := args.DbType
297 if dbType == "" {
298 dbType = "sqlite"
299 }
300
301 var gdb *gorm.DB
302 var err error
303 switch dbType {
304 case "postgres":
305 if args.DatabaseURL == "" {
306 return nil, fmt.Errorf("database-url must be set when using postgres")
307 }
308 gdb, err = gorm.Open(postgres.Open(args.DatabaseURL), &gorm.Config{})
309 if err != nil {
310 return nil, fmt.Errorf("failed to connect to postgres: %w", err)
311 }
312 args.Logger.Info("connected to PostgreSQL database")
313 default:
314 gdb, err = gorm.Open(sqlite.Open(args.DbName), &gorm.Config{})
315 if err != nil {
316 return nil, fmt.Errorf("failed to open sqlite database: %w", err)
317 }
318 args.Logger.Info("connected to SQLite database", "path", args.DbName)
319 }
320 dbw := db.NewDB(gdb)
321
322 rkbytes, err := os.ReadFile(args.RotationKeyPath)
323 if err != nil {
324 return nil, err
325 }
326
327 h := util.RobustHTTPClient()
328
329 plcClient, err := plc.NewClient(&plc.ClientArgs{
330 H: h,
331 Service: "https://plc.directory",
332 PdsHostname: args.Hostname,
333 RotationKey: rkbytes,
334 })
335 if err != nil {
336 return nil, err
337 }
338
339 jwkbytes, err := os.ReadFile(args.JwkPath)
340 if err != nil {
341 return nil, err
342 }
343
344 key, err := helpers.ParseJWKFromBytes(jwkbytes)
345 if err != nil {
346 return nil, err
347 }
348
349 var pkey ecdsa.PrivateKey
350 if err := key.Raw(&pkey); err != nil {
351 return nil, err
352 }
353
354 oauthCli := &http.Client{
355 Timeout: 10 * time.Second,
356 }
357
358 var nonceSecret []byte
359 maybeSecret, err := os.ReadFile("nonce.secret")
360 if err != nil && !os.IsNotExist(err) {
361 args.Logger.Error("error attempting to read nonce secret", "error", err)
362 } else {
363 nonceSecret = maybeSecret
364 }
365
366 s := &Server{
367 http: h,
368 httpd: httpd,
369 echo: e,
370 logger: args.Logger,
371 db: dbw,
372 plcClient: plcClient,
373 privateKey: &pkey,
374 config: &config{
375 Version: args.Version,
376 Did: args.Did,
377 Hostname: args.Hostname,
378 ContactEmail: args.ContactEmail,
379 EnforcePeering: false,
380 Relays: args.Relays,
381 AdminPassword: args.AdminPassword,
382 SmtpName: args.SmtpName,
383 SmtpEmail: args.SmtpEmail,
384 BlockstoreVariant: args.BlockstoreVariant,
385 FallbackProxy: args.FallbackProxy,
386 },
387 evtman: events.NewEventManager(events.NewMemPersister()),
388 passport: identity.NewPassport(h, identity.NewMemCache(10_000)),
389
390 dbName: args.DbName,
391 dbType: dbType,
392 s3Config: args.S3Config,
393
394 oauthProvider: provider.NewProvider(provider.Args{
395 Hostname: args.Hostname,
396 ClientManagerArgs: client.ManagerArgs{
397 Cli: oauthCli,
398 Logger: args.Logger,
399 },
400 DpopManagerArgs: dpop.ManagerArgs{
401 NonceSecret: nonceSecret,
402 NonceRotationInterval: constants.NonceMaxRotationInterval / 3,
403 OnNonceSecretCreated: func(newNonce []byte) {
404 if err := os.WriteFile("nonce.secret", newNonce, 0644); err != nil {
405 args.Logger.Error("error writing new nonce secret", "error", err)
406 }
407 },
408 Logger: args.Logger,
409 Hostname: args.Hostname,
410 },
411 }),
412 }
413
414 s.loadTemplates()
415
416 s.repoman = NewRepoMan(s) // TODO: this is way too lazy, stop it
417
418 // TODO: should validate these args
419 if args.SmtpUser == "" || args.SmtpPass == "" || args.SmtpHost == "" || args.SmtpPort == "" || args.SmtpEmail == "" || args.SmtpName == "" {
420 args.Logger.Warn("not enough smtp args were provided. mailing will not work for your server.")
421 } else {
422 mail := mailyak.New(args.SmtpHost+":"+args.SmtpPort, smtp.PlainAuth("", args.SmtpUser, args.SmtpPass, args.SmtpHost))
423 mail.From(s.config.SmtpEmail)
424 mail.FromName(s.config.SmtpName)
425
426 s.mail = mail
427 s.mailLk = &sync.Mutex{}
428 }
429
430 return s, nil
431}
432
433func (s *Server) addRoutes() {
434 // static
435 if s.config.Version == "dev" {
436 s.echo.Static("/static", "server/static")
437 } else {
438 s.echo.GET("/static/*", echo.WrapHandler(http.FileServer(http.FS(staticFS))))
439 }
440
441 // random stuff
442 s.echo.GET("/", s.handleRoot)
443 s.echo.GET("/xrpc/_health", s.handleHealth)
444 s.echo.GET("/.well-known/did.json", s.handleWellKnown)
445 s.echo.GET("/.well-known/oauth-protected-resource", s.handleOauthProtectedResource)
446 s.echo.GET("/.well-known/oauth-authorization-server", s.handleOauthAuthorizationServer)
447 s.echo.GET("/robots.txt", s.handleRobots)
448
449 // public
450 s.echo.GET("/xrpc/com.atproto.identity.resolveHandle", s.handleResolveHandle)
451 s.echo.POST("/xrpc/com.atproto.server.createAccount", s.handleCreateAccount)
452 s.echo.POST("/xrpc/com.atproto.server.createSession", s.handleCreateSession)
453 s.echo.GET("/xrpc/com.atproto.server.describeServer", s.handleDescribeServer)
454 s.echo.POST("/xrpc/com.atproto.server.reserveSigningKey", s.handleServerReserveSigningKey)
455
456 s.echo.GET("/xrpc/com.atproto.repo.describeRepo", s.handleDescribeRepo)
457 s.echo.GET("/xrpc/com.atproto.sync.listRepos", s.handleListRepos)
458 s.echo.GET("/xrpc/com.atproto.repo.listRecords", s.handleListRecords)
459 s.echo.GET("/xrpc/com.atproto.repo.getRecord", s.handleRepoGetRecord)
460 s.echo.GET("/xrpc/com.atproto.sync.getRecord", s.handleSyncGetRecord)
461 s.echo.GET("/xrpc/com.atproto.sync.getBlocks", s.handleGetBlocks)
462 s.echo.GET("/xrpc/com.atproto.sync.getLatestCommit", s.handleSyncGetLatestCommit)
463 s.echo.GET("/xrpc/com.atproto.sync.getRepoStatus", s.handleSyncGetRepoStatus)
464 s.echo.GET("/xrpc/com.atproto.sync.getRepo", s.handleSyncGetRepo)
465 s.echo.GET("/xrpc/com.atproto.sync.subscribeRepos", s.handleSyncSubscribeRepos)
466 s.echo.GET("/xrpc/com.atproto.sync.listBlobs", s.handleSyncListBlobs)
467 s.echo.GET("/xrpc/com.atproto.sync.getBlob", s.handleSyncGetBlob)
468
469 // account
470 s.echo.GET("/account", s.handleAccount)
471 s.echo.POST("/account/revoke", s.handleAccountRevoke)
472 s.echo.GET("/account/signin", s.handleAccountSigninGet)
473 s.echo.POST("/account/signin", s.handleAccountSigninPost)
474 s.echo.GET("/account/signout", s.handleAccountSignout)
475
476 // oauth account
477 s.echo.GET("/oauth/jwks", s.handleOauthJwks)
478 s.echo.GET("/oauth/authorize", s.handleOauthAuthorizeGet)
479 s.echo.POST("/oauth/authorize", s.handleOauthAuthorizePost)
480
481 // oauth authorization
482 s.echo.POST("/oauth/par", s.handleOauthPar, s.oauthProvider.BaseMiddleware)
483 s.echo.POST("/oauth/token", s.handleOauthToken, s.oauthProvider.BaseMiddleware)
484
485 // authed
486 s.echo.GET("/xrpc/com.atproto.server.getSession", s.handleGetSession, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
487 s.echo.POST("/xrpc/com.atproto.server.refreshSession", s.handleRefreshSession, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
488 s.echo.POST("/xrpc/com.atproto.server.deleteSession", s.handleDeleteSession, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
489 s.echo.GET("/xrpc/com.atproto.identity.getRecommendedDidCredentials", s.handleGetRecommendedDidCredentials, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
490 s.echo.POST("/xrpc/com.atproto.identity.updateHandle", s.handleIdentityUpdateHandle, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
491 s.echo.POST("/xrpc/com.atproto.identity.requestPlcOperationSignature", s.handleIdentityRequestPlcOperationSignature, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
492 s.echo.POST("/xrpc/com.atproto.identity.signPlcOperation", s.handleSignPlcOperation, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
493 s.echo.POST("/xrpc/com.atproto.identity.submitPlcOperation", s.handleSubmitPlcOperation, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
494 s.echo.POST("/xrpc/com.atproto.server.confirmEmail", s.handleServerConfirmEmail, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
495 s.echo.POST("/xrpc/com.atproto.server.requestEmailConfirmation", s.handleServerRequestEmailConfirmation, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
496 s.echo.POST("/xrpc/com.atproto.server.requestPasswordReset", s.handleServerRequestPasswordReset) // AUTH NOT REQUIRED FOR THIS ONE
497 s.echo.POST("/xrpc/com.atproto.server.requestEmailUpdate", s.handleServerRequestEmailUpdate, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
498 s.echo.POST("/xrpc/com.atproto.server.resetPassword", s.handleServerResetPassword, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
499 s.echo.POST("/xrpc/com.atproto.server.updateEmail", s.handleServerUpdateEmail, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
500 s.echo.GET("/xrpc/com.atproto.server.getServiceAuth", s.handleServerGetServiceAuth, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
501 s.echo.GET("/xrpc/com.atproto.server.checkAccountStatus", s.handleServerCheckAccountStatus, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
502 s.echo.POST("/xrpc/com.atproto.server.deactivateAccount", s.handleServerDeactivateAccount, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
503 s.echo.POST("/xrpc/com.atproto.server.activateAccount", s.handleServerActivateAccount, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
504 s.echo.POST("/xrpc/com.atproto.server.requestAccountDelete", s.handleServerRequestAccountDelete, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
505 s.echo.POST("/xrpc/com.atproto.server.deleteAccount", s.handleServerDeleteAccount)
506
507 // repo
508 s.echo.GET("/xrpc/com.atproto.repo.listMissingBlobs", s.handleListMissingBlobs, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
509 s.echo.POST("/xrpc/com.atproto.repo.createRecord", s.handleCreateRecord, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
510 s.echo.POST("/xrpc/com.atproto.repo.putRecord", s.handlePutRecord, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
511 s.echo.POST("/xrpc/com.atproto.repo.deleteRecord", s.handleDeleteRecord, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
512 s.echo.POST("/xrpc/com.atproto.repo.applyWrites", s.handleApplyWrites, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
513 s.echo.POST("/xrpc/com.atproto.repo.uploadBlob", s.handleRepoUploadBlob, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
514 s.echo.POST("/xrpc/com.atproto.repo.importRepo", s.handleRepoImportRepo, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
515
516 // stupid silly endpoints
517 s.echo.GET("/xrpc/app.bsky.actor.getPreferences", s.handleActorGetPreferences, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
518 s.echo.POST("/xrpc/app.bsky.actor.putPreferences", s.handleActorPutPreferences, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
519 s.echo.GET("/xrpc/app.bsky.feed.getFeed", s.handleProxyBskyFeedGetFeed, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
520
521 // admin routes
522 s.echo.POST("/xrpc/com.atproto.server.createInviteCode", s.handleCreateInviteCode, s.handleAdminMiddleware)
523 s.echo.POST("/xrpc/com.atproto.server.createInviteCodes", s.handleCreateInviteCodes, s.handleAdminMiddleware)
524
525 // are there any routes that we should be allowing without auth? i dont think so but idk
526 s.echo.GET("/xrpc/*", s.handleProxy, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
527 s.echo.POST("/xrpc/*", s.handleProxy, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
528}
529
530func (s *Server) Serve(ctx context.Context) error {
531 s.addRoutes()
532
533 s.logger.Info("migrating...")
534
535 s.db.AutoMigrate(
536 &models.Actor{},
537 &models.Repo{},
538 &models.InviteCode{},
539 &models.Token{},
540 &models.RefreshToken{},
541 &models.Block{},
542 &models.Record{},
543 &models.Blob{},
544 &models.BlobPart{},
545 &models.ReservedKey{},
546 &provider.OauthToken{},
547 &provider.OauthAuthorizationRequest{},
548 )
549
550 s.logger.Info("starting cocoon")
551
552 go func() {
553 if err := s.httpd.ListenAndServe(); err != nil {
554 panic(err)
555 }
556 }()
557
558 go s.backupRoutine()
559
560 go func() {
561 if err := s.requestCrawl(ctx); err != nil {
562 s.logger.Error("error requesting crawls", "err", err)
563 }
564 }()
565
566 <-ctx.Done()
567
568 fmt.Println("shut down")
569
570 return nil
571}
572
573func (s *Server) requestCrawl(ctx context.Context) error {
574 logger := s.logger.With("component", "request-crawl")
575 s.requestCrawlMu.Lock()
576 defer s.requestCrawlMu.Unlock()
577
578 logger.Info("requesting crawl with configured relays")
579
580 if time.Now().Sub(s.lastRequestCrawl) <= 1*time.Minute {
581 return fmt.Errorf("a crawl request has already been made within the last minute")
582 }
583
584 for _, relay := range s.config.Relays {
585 logger := logger.With("relay", relay)
586 logger.Info("requesting crawl from relay")
587 cli := xrpc.Client{Host: relay}
588 if err := atproto.SyncRequestCrawl(ctx, &cli, &atproto.SyncRequestCrawl_Input{
589 Hostname: s.config.Hostname,
590 }); err != nil {
591 logger.Error("error requesting crawl", "err", err)
592 } else {
593 logger.Info("crawl requested successfully")
594 }
595 }
596
597 s.lastRequestCrawl = time.Now()
598
599 return nil
600}
601
602func (s *Server) doBackup() {
603 if s.dbType == "postgres" {
604 s.logger.Info("skipping S3 backup - PostgreSQL backups should be handled externally (pg_dump, managed database backups, etc.)")
605 return
606 }
607
608 start := time.Now()
609
610 s.logger.Info("beginning backup to s3...")
611
612 var buf bytes.Buffer
613 if err := func() error {
614 s.logger.Info("reading database bytes...")
615 s.db.Lock()
616 defer s.db.Unlock()
617
618 sf, err := os.Open(s.dbName)
619 if err != nil {
620 return fmt.Errorf("error opening database for backup: %w", err)
621 }
622 defer sf.Close()
623
624 if _, err := io.Copy(&buf, sf); err != nil {
625 return fmt.Errorf("error reading bytes of backup db: %w", err)
626 }
627
628 return nil
629 }(); err != nil {
630 s.logger.Error("error backing up database", "error", err)
631 return
632 }
633
634 if err := func() error {
635 s.logger.Info("sending to s3...")
636
637 currTime := time.Now().Format("2006-01-02_15-04-05")
638 key := "cocoon-backup-" + currTime + ".db"
639
640 config := &aws.Config{
641 Region: aws.String(s.s3Config.Region),
642 Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""),
643 }
644
645 if s.s3Config.Endpoint != "" {
646 config.Endpoint = aws.String(s.s3Config.Endpoint)
647 config.S3ForcePathStyle = aws.Bool(true)
648 }
649
650 sess, err := session.NewSession(config)
651 if err != nil {
652 return err
653 }
654
655 svc := s3.New(sess)
656
657 if _, err := svc.PutObject(&s3.PutObjectInput{
658 Bucket: aws.String(s.s3Config.Bucket),
659 Key: aws.String(key),
660 Body: bytes.NewReader(buf.Bytes()),
661 }); err != nil {
662 return fmt.Errorf("error uploading file to s3: %w", err)
663 }
664
665 s.logger.Info("finished uploading backup to s3", "key", key, "duration", time.Now().Sub(start).Seconds())
666
667 return nil
668 }(); err != nil {
669 s.logger.Error("error uploading database backup", "error", err)
670 return
671 }
672
673 os.WriteFile("last-backup.txt", []byte(time.Now().String()), 0644)
674}
675
676func (s *Server) backupRoutine() {
677 if s.s3Config == nil || !s.s3Config.BackupsEnabled {
678 return
679 }
680
681 if s.s3Config.Region == "" {
682 s.logger.Warn("no s3 region configured but backups are enabled. backups will not run.")
683 return
684 }
685
686 if s.s3Config.Bucket == "" {
687 s.logger.Warn("no s3 bucket configured but backups are enabled. backups will not run.")
688 return
689 }
690
691 if s.s3Config.AccessKey == "" {
692 s.logger.Warn("no s3 access key configured but backups are enabled. backups will not run.")
693 return
694 }
695
696 if s.s3Config.SecretKey == "" {
697 s.logger.Warn("no s3 secret key configured but backups are enabled. backups will not run.")
698 return
699 }
700
701 shouldBackupNow := false
702 lastBackupStr, err := os.ReadFile("last-backup.txt")
703 if err != nil {
704 shouldBackupNow = true
705 } else {
706 lastBackup, err := time.Parse("2006-01-02 15:04:05.999999999 -0700 MST", string(lastBackupStr))
707 if err != nil {
708 shouldBackupNow = true
709 } else if time.Now().Sub(lastBackup).Seconds() > 3600 {
710 shouldBackupNow = true
711 }
712 }
713
714 if shouldBackupNow {
715 go s.doBackup()
716 }
717
718 ticker := time.NewTicker(time.Hour)
719 for range ticker.C {
720 go s.doBackup()
721 }
722}
723
724func (s *Server) UpdateRepo(ctx context.Context, did string, root cid.Cid, rev string) error {
725 if err := s.db.Exec("UPDATE repos SET root = ?, rev = ? WHERE did = ?", nil, root.Bytes(), rev, did).Error; err != nil {
726 return err
727 }
728
729 return nil
730}