1package server
2
3import (
4 "bytes"
5 "context"
6 "crypto/ecdsa"
7 "embed"
8 "errors"
9 "fmt"
10 "io"
11 "log/slog"
12 "net/http"
13 "net/smtp"
14 "os"
15 "path/filepath"
16 "sync"
17 "text/template"
18 "time"
19
20 "github.com/aws/aws-sdk-go/aws"
21 "github.com/aws/aws-sdk-go/aws/credentials"
22 "github.com/aws/aws-sdk-go/aws/session"
23 "github.com/aws/aws-sdk-go/service/s3"
24 "github.com/bluesky-social/indigo/api/atproto"
25 "github.com/bluesky-social/indigo/atproto/syntax"
26 "github.com/bluesky-social/indigo/events"
27 "github.com/bluesky-social/indigo/util"
28 "github.com/bluesky-social/indigo/xrpc"
29 "github.com/domodwyer/mailyak/v3"
30 "github.com/go-playground/validator"
31 "github.com/gorilla/sessions"
32 "github.com/haileyok/cocoon/identity"
33 "github.com/haileyok/cocoon/internal/db"
34 "github.com/haileyok/cocoon/internal/helpers"
35 "github.com/haileyok/cocoon/models"
36 "github.com/haileyok/cocoon/oauth/client"
37 "github.com/haileyok/cocoon/oauth/constants"
38 "github.com/haileyok/cocoon/oauth/dpop"
39 "github.com/haileyok/cocoon/oauth/provider"
40 "github.com/haileyok/cocoon/plc"
41 "github.com/ipfs/go-cid"
42 echo_session "github.com/labstack/echo-contrib/session"
43 "github.com/labstack/echo/v4"
44 "github.com/labstack/echo/v4/middleware"
45 slogecho "github.com/samber/slog-echo"
46 "gorm.io/driver/postgres"
47 "gorm.io/driver/sqlite"
48 "gorm.io/gorm"
49)
50
51const (
52 AccountSessionMaxAge = 30 * 24 * time.Hour // one week
53)
54
55type S3Config struct {
56 BackupsEnabled bool
57 BlobstoreEnabled bool
58 Endpoint string
59 Region string
60 Bucket string
61 AccessKey string
62 SecretKey string
63 CDNUrl string
64}
65
66type Server struct {
67 http *http.Client
68 httpd *http.Server
69 mail *mailyak.MailYak
70 mailLk *sync.Mutex
71 echo *echo.Echo
72 db *db.DB
73 plcClient *plc.Client
74 logger *slog.Logger
75 config *config
76 privateKey *ecdsa.PrivateKey
77 repoman *RepoMan
78 oauthProvider *provider.Provider
79 evtman *events.EventManager
80 passport *identity.Passport
81 fallbackProxy string
82
83 lastRequestCrawl time.Time
84 requestCrawlMu sync.Mutex
85
86 dbName string
87 dbType string
88 s3Config *S3Config
89}
90
91type Args struct {
92 Addr string
93 DbName string
94 DbType string
95 DatabaseURL string
96 Logger *slog.Logger
97 Version string
98 Did string
99 Hostname string
100 RotationKeyPath string
101 JwkPath string
102 ContactEmail string
103 Relays []string
104 AdminPassword string
105 RequireInvite bool
106
107 SmtpUser string
108 SmtpPass string
109 SmtpHost string
110 SmtpPort string
111 SmtpEmail string
112 SmtpName string
113
114 S3Config *S3Config
115
116 SessionSecret string
117
118 BlockstoreVariant BlockstoreVariant
119 FallbackProxy string
120}
121
122type config struct {
123 Version string
124 Did string
125 Hostname string
126 ContactEmail string
127 EnforcePeering bool
128 Relays []string
129 AdminPassword string
130 RequireInvite bool
131 SmtpEmail string
132 SmtpName string
133 BlockstoreVariant BlockstoreVariant
134 FallbackProxy string
135}
136
137type CustomValidator struct {
138 validator *validator.Validate
139}
140
141type ValidationError struct {
142 error
143 Field string
144 Tag string
145}
146
147func (cv *CustomValidator) Validate(i any) error {
148 if err := cv.validator.Struct(i); err != nil {
149 var validateErrors validator.ValidationErrors
150 if errors.As(err, &validateErrors) && len(validateErrors) > 0 {
151 first := validateErrors[0]
152 return ValidationError{
153 error: err,
154 Field: first.Field(),
155 Tag: first.Tag(),
156 }
157 }
158
159 return err
160 }
161
162 return nil
163}
164
165//go:embed templates/*
166var templateFS embed.FS
167
168//go:embed static/*
169var staticFS embed.FS
170
171type TemplateRenderer struct {
172 templates *template.Template
173 isDev bool
174 templatePath string
175}
176
177func (s *Server) loadTemplates() {
178 absPath, _ := filepath.Abs("server/templates/*.html")
179 if s.config.Version == "dev" {
180 tmpl := template.Must(template.ParseGlob(absPath))
181 s.echo.Renderer = &TemplateRenderer{
182 templates: tmpl,
183 isDev: true,
184 templatePath: absPath,
185 }
186 } else {
187 tmpl := template.Must(template.ParseFS(templateFS, "templates/*.html"))
188 s.echo.Renderer = &TemplateRenderer{
189 templates: tmpl,
190 isDev: false,
191 }
192 }
193}
194
195func (t *TemplateRenderer) Render(w io.Writer, name string, data any, c echo.Context) error {
196 if t.isDev {
197 tmpl, err := template.ParseGlob(t.templatePath)
198 if err != nil {
199 return err
200 }
201 t.templates = tmpl
202 }
203
204 if viewContext, isMap := data.(map[string]any); isMap {
205 viewContext["reverse"] = c.Echo().Reverse
206 }
207
208 return t.templates.ExecuteTemplate(w, name, data)
209}
210
211func New(args *Args) (*Server, error) {
212 if args.Addr == "" {
213 return nil, fmt.Errorf("addr must be set")
214 }
215
216 if args.DbName == "" {
217 return nil, fmt.Errorf("db name must be set")
218 }
219
220 if args.Did == "" {
221 return nil, fmt.Errorf("cocoon did must be set")
222 }
223
224 if args.ContactEmail == "" {
225 return nil, fmt.Errorf("cocoon contact email is required")
226 }
227
228 if _, err := syntax.ParseDID(args.Did); err != nil {
229 return nil, fmt.Errorf("error parsing cocoon did: %w", err)
230 }
231
232 if args.Hostname == "" {
233 return nil, fmt.Errorf("cocoon hostname must be set")
234 }
235
236 if args.AdminPassword == "" {
237 return nil, fmt.Errorf("admin password must be set")
238 }
239
240 if args.Logger == nil {
241 args.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{}))
242 }
243
244 if args.SessionSecret == "" {
245 panic("SESSION SECRET WAS NOT SET. THIS IS REQUIRED. ")
246 }
247
248 e := echo.New()
249
250 e.Pre(middleware.RemoveTrailingSlash())
251 e.Pre(slogecho.New(args.Logger))
252 e.Use(echo_session.Middleware(sessions.NewCookieStore([]byte(args.SessionSecret))))
253 e.Use(middleware.CORSWithConfig(middleware.CORSConfig{
254 AllowOrigins: []string{"*"},
255 AllowHeaders: []string{"*"},
256 AllowMethods: []string{"*"},
257 AllowCredentials: true,
258 MaxAge: 100_000_000,
259 }))
260
261 vdtor := validator.New()
262 vdtor.RegisterValidation("atproto-handle", func(fl validator.FieldLevel) bool {
263 if _, err := syntax.ParseHandle(fl.Field().String()); err != nil {
264 return false
265 }
266 return true
267 })
268 vdtor.RegisterValidation("atproto-did", func(fl validator.FieldLevel) bool {
269 if _, err := syntax.ParseDID(fl.Field().String()); err != nil {
270 return false
271 }
272 return true
273 })
274 vdtor.RegisterValidation("atproto-rkey", func(fl validator.FieldLevel) bool {
275 if _, err := syntax.ParseRecordKey(fl.Field().String()); err != nil {
276 return false
277 }
278 return true
279 })
280 vdtor.RegisterValidation("atproto-nsid", func(fl validator.FieldLevel) bool {
281 if _, err := syntax.ParseNSID(fl.Field().String()); err != nil {
282 return false
283 }
284 return true
285 })
286
287 e.Validator = &CustomValidator{validator: vdtor}
288
289 httpd := &http.Server{
290 Addr: args.Addr,
291 Handler: e,
292 // shitty defaults but okay for now, needed for import repo
293 ReadTimeout: 5 * time.Minute,
294 WriteTimeout: 5 * time.Minute,
295 IdleTimeout: 5 * time.Minute,
296 }
297
298 dbType := args.DbType
299 if dbType == "" {
300 dbType = "sqlite"
301 }
302
303 var gdb *gorm.DB
304 var err error
305 switch dbType {
306 case "postgres":
307 if args.DatabaseURL == "" {
308 return nil, fmt.Errorf("database-url must be set when using postgres")
309 }
310 gdb, err = gorm.Open(postgres.Open(args.DatabaseURL), &gorm.Config{})
311 if err != nil {
312 return nil, fmt.Errorf("failed to connect to postgres: %w", err)
313 }
314 args.Logger.Info("connected to PostgreSQL database")
315 default:
316 gdb, err = gorm.Open(sqlite.Open(args.DbName), &gorm.Config{})
317 if err != nil {
318 return nil, fmt.Errorf("failed to open sqlite database: %w", err)
319 }
320 args.Logger.Info("connected to SQLite database", "path", args.DbName)
321 }
322 dbw := db.NewDB(gdb)
323
324 rkbytes, err := os.ReadFile(args.RotationKeyPath)
325 if err != nil {
326 return nil, err
327 }
328
329 h := util.RobustHTTPClient()
330
331 plcClient, err := plc.NewClient(&plc.ClientArgs{
332 H: h,
333 Service: "https://plc.directory",
334 PdsHostname: args.Hostname,
335 RotationKey: rkbytes,
336 })
337 if err != nil {
338 return nil, err
339 }
340
341 jwkbytes, err := os.ReadFile(args.JwkPath)
342 if err != nil {
343 return nil, err
344 }
345
346 key, err := helpers.ParseJWKFromBytes(jwkbytes)
347 if err != nil {
348 return nil, err
349 }
350
351 var pkey ecdsa.PrivateKey
352 if err := key.Raw(&pkey); err != nil {
353 return nil, err
354 }
355
356 oauthCli := &http.Client{
357 Timeout: 10 * time.Second,
358 }
359
360 var nonceSecret []byte
361 maybeSecret, err := os.ReadFile("nonce.secret")
362 if err != nil && !os.IsNotExist(err) {
363 args.Logger.Error("error attempting to read nonce secret", "error", err)
364 } else {
365 nonceSecret = maybeSecret
366 }
367
368 s := &Server{
369 http: h,
370 httpd: httpd,
371 echo: e,
372 logger: args.Logger,
373 db: dbw,
374 plcClient: plcClient,
375 privateKey: &pkey,
376 config: &config{
377 Version: args.Version,
378 Did: args.Did,
379 Hostname: args.Hostname,
380 ContactEmail: args.ContactEmail,
381 EnforcePeering: false,
382 Relays: args.Relays,
383 AdminPassword: args.AdminPassword,
384 RequireInvite: args.RequireInvite,
385 SmtpName: args.SmtpName,
386 SmtpEmail: args.SmtpEmail,
387 BlockstoreVariant: args.BlockstoreVariant,
388 FallbackProxy: args.FallbackProxy,
389 },
390 evtman: events.NewEventManager(events.NewMemPersister()),
391 passport: identity.NewPassport(h, identity.NewMemCache(10_000)),
392
393 dbName: args.DbName,
394 dbType: dbType,
395 s3Config: args.S3Config,
396
397 oauthProvider: provider.NewProvider(provider.Args{
398 Hostname: args.Hostname,
399 ClientManagerArgs: client.ManagerArgs{
400 Cli: oauthCli,
401 Logger: args.Logger,
402 },
403 DpopManagerArgs: dpop.ManagerArgs{
404 NonceSecret: nonceSecret,
405 NonceRotationInterval: constants.NonceMaxRotationInterval / 3,
406 OnNonceSecretCreated: func(newNonce []byte) {
407 if err := os.WriteFile("nonce.secret", newNonce, 0644); err != nil {
408 args.Logger.Error("error writing new nonce secret", "error", err)
409 }
410 },
411 Logger: args.Logger,
412 Hostname: args.Hostname,
413 },
414 }),
415 }
416
417 s.loadTemplates()
418
419 s.repoman = NewRepoMan(s) // TODO: this is way too lazy, stop it
420
421 // TODO: should validate these args
422 if args.SmtpUser == "" || args.SmtpPass == "" || args.SmtpHost == "" || args.SmtpPort == "" || args.SmtpEmail == "" || args.SmtpName == "" {
423 args.Logger.Warn("not enough smtp args were provided. mailing will not work for your server.")
424 } else {
425 mail := mailyak.New(args.SmtpHost+":"+args.SmtpPort, smtp.PlainAuth("", args.SmtpUser, args.SmtpPass, args.SmtpHost))
426 mail.From(s.config.SmtpEmail)
427 mail.FromName(s.config.SmtpName)
428
429 s.mail = mail
430 s.mailLk = &sync.Mutex{}
431 }
432
433 return s, nil
434}
435
436func (s *Server) addRoutes() {
437 // static
438 if s.config.Version == "dev" {
439 s.echo.Static("/static", "server/static")
440 } else {
441 s.echo.GET("/static/*", echo.WrapHandler(http.FileServer(http.FS(staticFS))))
442 }
443
444 // random stuff
445 s.echo.GET("/", s.handleRoot)
446 s.echo.GET("/xrpc/_health", s.handleHealth)
447 s.echo.GET("/.well-known/did.json", s.handleWellKnown)
448 s.echo.GET("/.well-known/atproto-did", s.handleAtprotoDid)
449 s.echo.GET("/.well-known/oauth-protected-resource", s.handleOauthProtectedResource)
450 s.echo.GET("/.well-known/oauth-authorization-server", s.handleOauthAuthorizationServer)
451 s.echo.GET("/robots.txt", s.handleRobots)
452
453 // public
454 s.echo.GET("/xrpc/com.atproto.identity.resolveHandle", s.handleResolveHandle)
455 s.echo.POST("/xrpc/com.atproto.server.createAccount", s.handleCreateAccount)
456 s.echo.POST("/xrpc/com.atproto.server.createSession", s.handleCreateSession)
457 s.echo.GET("/xrpc/com.atproto.server.describeServer", s.handleDescribeServer)
458 s.echo.POST("/xrpc/com.atproto.server.reserveSigningKey", s.handleServerReserveSigningKey)
459
460 s.echo.GET("/xrpc/com.atproto.repo.describeRepo", s.handleDescribeRepo)
461 s.echo.GET("/xrpc/com.atproto.sync.listRepos", s.handleListRepos)
462 s.echo.GET("/xrpc/com.atproto.repo.listRecords", s.handleListRecords)
463 s.echo.GET("/xrpc/com.atproto.repo.getRecord", s.handleRepoGetRecord)
464 s.echo.GET("/xrpc/com.atproto.sync.getRecord", s.handleSyncGetRecord)
465 s.echo.GET("/xrpc/com.atproto.sync.getBlocks", s.handleGetBlocks)
466 s.echo.GET("/xrpc/com.atproto.sync.getLatestCommit", s.handleSyncGetLatestCommit)
467 s.echo.GET("/xrpc/com.atproto.sync.getRepoStatus", s.handleSyncGetRepoStatus)
468 s.echo.GET("/xrpc/com.atproto.sync.getRepo", s.handleSyncGetRepo)
469 s.echo.GET("/xrpc/com.atproto.sync.subscribeRepos", s.handleSyncSubscribeRepos)
470 s.echo.GET("/xrpc/com.atproto.sync.listBlobs", s.handleSyncListBlobs)
471 s.echo.GET("/xrpc/com.atproto.sync.getBlob", s.handleSyncGetBlob)
472
473 // labels
474 s.echo.GET("/xrpc/com.atproto.label.queryLabels", s.handleLabelQueryLabels)
475
476 // account
477 s.echo.GET("/account", s.handleAccount)
478 s.echo.POST("/account/revoke", s.handleAccountRevoke)
479 s.echo.GET("/account/signin", s.handleAccountSigninGet)
480 s.echo.POST("/account/signin", s.handleAccountSigninPost)
481 s.echo.GET("/account/signout", s.handleAccountSignout)
482
483 // oauth account
484 s.echo.GET("/oauth/jwks", s.handleOauthJwks)
485 s.echo.GET("/oauth/authorize", s.handleOauthAuthorizeGet)
486 s.echo.POST("/oauth/authorize", s.handleOauthAuthorizePost)
487
488 // oauth authorization
489 s.echo.POST("/oauth/par", s.handleOauthPar, s.oauthProvider.BaseMiddleware)
490 s.echo.POST("/oauth/token", s.handleOauthToken, s.oauthProvider.BaseMiddleware)
491
492 // authed
493 s.echo.GET("/xrpc/com.atproto.server.getSession", s.handleGetSession, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
494 s.echo.POST("/xrpc/com.atproto.server.refreshSession", s.handleRefreshSession, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
495 s.echo.POST("/xrpc/com.atproto.server.deleteSession", s.handleDeleteSession, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
496 s.echo.GET("/xrpc/com.atproto.identity.getRecommendedDidCredentials", s.handleGetRecommendedDidCredentials, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
497 s.echo.POST("/xrpc/com.atproto.identity.updateHandle", s.handleIdentityUpdateHandle, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
498 s.echo.POST("/xrpc/com.atproto.identity.requestPlcOperationSignature", s.handleIdentityRequestPlcOperationSignature, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
499 s.echo.POST("/xrpc/com.atproto.identity.signPlcOperation", s.handleSignPlcOperation, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
500 s.echo.POST("/xrpc/com.atproto.identity.submitPlcOperation", s.handleSubmitPlcOperation, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
501 s.echo.POST("/xrpc/com.atproto.server.confirmEmail", s.handleServerConfirmEmail, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
502 s.echo.POST("/xrpc/com.atproto.server.requestEmailConfirmation", s.handleServerRequestEmailConfirmation, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
503 s.echo.POST("/xrpc/com.atproto.server.requestPasswordReset", s.handleServerRequestPasswordReset) // AUTH NOT REQUIRED FOR THIS ONE
504 s.echo.POST("/xrpc/com.atproto.server.requestEmailUpdate", s.handleServerRequestEmailUpdate, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
505 s.echo.POST("/xrpc/com.atproto.server.resetPassword", s.handleServerResetPassword, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
506 s.echo.POST("/xrpc/com.atproto.server.updateEmail", s.handleServerUpdateEmail, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
507 s.echo.GET("/xrpc/com.atproto.server.getServiceAuth", s.handleServerGetServiceAuth, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
508 s.echo.GET("/xrpc/com.atproto.server.checkAccountStatus", s.handleServerCheckAccountStatus, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
509 s.echo.POST("/xrpc/com.atproto.server.deactivateAccount", s.handleServerDeactivateAccount, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
510 s.echo.POST("/xrpc/com.atproto.server.activateAccount", s.handleServerActivateAccount, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
511 s.echo.POST("/xrpc/com.atproto.server.requestAccountDelete", s.handleServerRequestAccountDelete, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
512 s.echo.POST("/xrpc/com.atproto.server.deleteAccount", s.handleServerDeleteAccount)
513
514 // repo
515 s.echo.GET("/xrpc/com.atproto.repo.listMissingBlobs", s.handleListMissingBlobs, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
516 s.echo.POST("/xrpc/com.atproto.repo.createRecord", s.handleCreateRecord, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
517 s.echo.POST("/xrpc/com.atproto.repo.putRecord", s.handlePutRecord, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
518 s.echo.POST("/xrpc/com.atproto.repo.deleteRecord", s.handleDeleteRecord, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
519 s.echo.POST("/xrpc/com.atproto.repo.applyWrites", s.handleApplyWrites, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
520 s.echo.POST("/xrpc/com.atproto.repo.uploadBlob", s.handleRepoUploadBlob, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
521 s.echo.POST("/xrpc/com.atproto.repo.importRepo", s.handleRepoImportRepo, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
522
523 // stupid silly endpoints
524 s.echo.GET("/xrpc/app.bsky.actor.getPreferences", s.handleActorGetPreferences, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
525 s.echo.POST("/xrpc/app.bsky.actor.putPreferences", s.handleActorPutPreferences, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
526 s.echo.GET("/xrpc/app.bsky.feed.getFeed", s.handleProxyBskyFeedGetFeed, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
527
528 // admin routes
529 s.echo.POST("/xrpc/com.atproto.server.createInviteCode", s.handleCreateInviteCode, s.handleAdminMiddleware)
530 s.echo.POST("/xrpc/com.atproto.server.createInviteCodes", s.handleCreateInviteCodes, s.handleAdminMiddleware)
531
532 // are there any routes that we should be allowing without auth? i dont think so but idk
533 s.echo.GET("/xrpc/*", s.handleProxy, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
534 s.echo.POST("/xrpc/*", s.handleProxy, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
535}
536
537func (s *Server) Serve(ctx context.Context) error {
538 s.addRoutes()
539
540 s.logger.Info("migrating...")
541
542 s.db.AutoMigrate(
543 &models.Actor{},
544 &models.Repo{},
545 &models.InviteCode{},
546 &models.Token{},
547 &models.RefreshToken{},
548 &models.Block{},
549 &models.Record{},
550 &models.Blob{},
551 &models.BlobPart{},
552 &models.ReservedKey{},
553 &provider.OauthToken{},
554 &provider.OauthAuthorizationRequest{},
555 )
556
557 s.logger.Info("starting cocoon")
558
559 go func() {
560 if err := s.httpd.ListenAndServe(); err != nil {
561 panic(err)
562 }
563 }()
564
565 go s.backupRoutine()
566
567 go func() {
568 if err := s.requestCrawl(ctx); err != nil {
569 s.logger.Error("error requesting crawls", "err", err)
570 }
571 }()
572
573 <-ctx.Done()
574
575 fmt.Println("shut down")
576
577 return nil
578}
579
580func (s *Server) requestCrawl(ctx context.Context) error {
581 logger := s.logger.With("component", "request-crawl")
582 s.requestCrawlMu.Lock()
583 defer s.requestCrawlMu.Unlock()
584
585 logger.Info("requesting crawl with configured relays")
586
587 if time.Now().Sub(s.lastRequestCrawl) <= 1*time.Minute {
588 return fmt.Errorf("a crawl request has already been made within the last minute")
589 }
590
591 for _, relay := range s.config.Relays {
592 logger := logger.With("relay", relay)
593 logger.Info("requesting crawl from relay")
594 cli := xrpc.Client{Host: relay}
595 if err := atproto.SyncRequestCrawl(ctx, &cli, &atproto.SyncRequestCrawl_Input{
596 Hostname: s.config.Hostname,
597 }); err != nil {
598 logger.Error("error requesting crawl", "err", err)
599 } else {
600 logger.Info("crawl requested successfully")
601 }
602 }
603
604 s.lastRequestCrawl = time.Now()
605
606 return nil
607}
608
609func (s *Server) doBackup() {
610 if s.dbType == "postgres" {
611 s.logger.Info("skipping S3 backup - PostgreSQL backups should be handled externally (pg_dump, managed database backups, etc.)")
612 return
613 }
614
615 start := time.Now()
616
617 s.logger.Info("beginning backup to s3...")
618
619 var buf bytes.Buffer
620 if err := func() error {
621 s.logger.Info("reading database bytes...")
622 s.db.Lock()
623 defer s.db.Unlock()
624
625 sf, err := os.Open(s.dbName)
626 if err != nil {
627 return fmt.Errorf("error opening database for backup: %w", err)
628 }
629 defer sf.Close()
630
631 if _, err := io.Copy(&buf, sf); err != nil {
632 return fmt.Errorf("error reading bytes of backup db: %w", err)
633 }
634
635 return nil
636 }(); err != nil {
637 s.logger.Error("error backing up database", "error", err)
638 return
639 }
640
641 if err := func() error {
642 s.logger.Info("sending to s3...")
643
644 currTime := time.Now().Format("2006-01-02_15-04-05")
645 key := "cocoon-backup-" + currTime + ".db"
646
647 config := &aws.Config{
648 Region: aws.String(s.s3Config.Region),
649 Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""),
650 }
651
652 if s.s3Config.Endpoint != "" {
653 config.Endpoint = aws.String(s.s3Config.Endpoint)
654 config.S3ForcePathStyle = aws.Bool(true)
655 }
656
657 sess, err := session.NewSession(config)
658 if err != nil {
659 return err
660 }
661
662 svc := s3.New(sess)
663
664 if _, err := svc.PutObject(&s3.PutObjectInput{
665 Bucket: aws.String(s.s3Config.Bucket),
666 Key: aws.String(key),
667 Body: bytes.NewReader(buf.Bytes()),
668 }); err != nil {
669 return fmt.Errorf("error uploading file to s3: %w", err)
670 }
671
672 s.logger.Info("finished uploading backup to s3", "key", key, "duration", time.Now().Sub(start).Seconds())
673
674 return nil
675 }(); err != nil {
676 s.logger.Error("error uploading database backup", "error", err)
677 return
678 }
679
680 os.WriteFile("last-backup.txt", []byte(time.Now().String()), 0644)
681}
682
683func (s *Server) backupRoutine() {
684 if s.s3Config == nil || !s.s3Config.BackupsEnabled {
685 return
686 }
687
688 if s.s3Config.Region == "" {
689 s.logger.Warn("no s3 region configured but backups are enabled. backups will not run.")
690 return
691 }
692
693 if s.s3Config.Bucket == "" {
694 s.logger.Warn("no s3 bucket configured but backups are enabled. backups will not run.")
695 return
696 }
697
698 if s.s3Config.AccessKey == "" {
699 s.logger.Warn("no s3 access key configured but backups are enabled. backups will not run.")
700 return
701 }
702
703 if s.s3Config.SecretKey == "" {
704 s.logger.Warn("no s3 secret key configured but backups are enabled. backups will not run.")
705 return
706 }
707
708 shouldBackupNow := false
709 lastBackupStr, err := os.ReadFile("last-backup.txt")
710 if err != nil {
711 shouldBackupNow = true
712 } else {
713 lastBackup, err := time.Parse("2006-01-02 15:04:05.999999999 -0700 MST", string(lastBackupStr))
714 if err != nil {
715 shouldBackupNow = true
716 } else if time.Now().Sub(lastBackup).Seconds() > 3600 {
717 shouldBackupNow = true
718 }
719 }
720
721 if shouldBackupNow {
722 go s.doBackup()
723 }
724
725 ticker := time.NewTicker(time.Hour)
726 for range ticker.C {
727 go s.doBackup()
728 }
729}
730
731func (s *Server) UpdateRepo(ctx context.Context, did string, root cid.Cid, rev string) error {
732 if err := s.db.Exec("UPDATE repos SET root = ?, rev = ? WHERE did = ?", nil, root.Bytes(), rev, did).Error; err != nil {
733 return err
734 }
735
736 return nil
737}