forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
at master 18 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/config" 16 "tangled.org/core/appview/db" 17 "tangled.org/core/appview/indexer" 18 "tangled.org/core/appview/mentions" 19 "tangled.org/core/appview/models" 20 "tangled.org/core/appview/notify" 21 dbnotify "tangled.org/core/appview/notify/db" 22 phnotify "tangled.org/core/appview/notify/posthog" 23 "tangled.org/core/appview/oauth" 24 "tangled.org/core/appview/pages" 25 "tangled.org/core/appview/reporesolver" 26 "tangled.org/core/appview/validator" 27 xrpcclient "tangled.org/core/appview/xrpcclient" 28 "tangled.org/core/eventconsumer" 29 "tangled.org/core/idresolver" 30 "tangled.org/core/jetstream" 31 "tangled.org/core/log" 32 tlog "tangled.org/core/log" 33 "tangled.org/core/orm" 34 "tangled.org/core/rbac" 35 "tangled.org/core/tid" 36 37 comatproto "github.com/bluesky-social/indigo/api/atproto" 38 atpclient "github.com/bluesky-social/indigo/atproto/client" 39 "github.com/bluesky-social/indigo/atproto/syntax" 40 lexutil "github.com/bluesky-social/indigo/lex/util" 41 securejoin "github.com/cyphar/filepath-securejoin" 42 "github.com/go-chi/chi/v5" 43 "github.com/posthog/posthog-go" 44) 45 46type State struct { 47 db *db.DB 48 notifier notify.Notifier 49 indexer *indexer.Indexer 50 oauth *oauth.OAuth 51 enforcer *rbac.Enforcer 52 pages *pages.Pages 53 idResolver *idresolver.Resolver 54 mentionsResolver *mentions.Resolver 55 posthog posthog.Client 56 jc *jetstream.JetstreamClient 57 config *config.Config 58 repoResolver *reporesolver.RepoResolver 59 knotstream *eventconsumer.Consumer 60 spindlestream *eventconsumer.Consumer 61 logger *slog.Logger 62 validator *validator.Validator 63} 64 65func Make(ctx context.Context, config *config.Config) (*State, error) { 66 logger := tlog.FromContext(ctx) 67 68 d, err := db.Make(ctx, config.Core.DbPath) 69 if err != nil { 70 return nil, fmt.Errorf("failed to create db: %w", err) 71 } 72 73 indexer := indexer.New(log.SubLogger(logger, "indexer")) 74 err = indexer.Init(ctx, d) 75 if err != nil { 76 return nil, fmt.Errorf("failed to create indexer: %w", err) 77 } 78 79 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 80 if err != nil { 81 return nil, fmt.Errorf("failed to create enforcer: %w", err) 82 } 83 84 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 85 if err != nil { 86 logger.Error("failed to create redis resolver", "err", err) 87 res = idresolver.DefaultResolver(config.Plc.PLCURL) 88 } 89 90 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 91 if err != nil { 92 return nil, fmt.Errorf("failed to create posthog client: %w", err) 93 } 94 95 pages := pages.NewPages(config, res, log.SubLogger(logger, "pages")) 96 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 97 if err != nil { 98 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 99 } 100 validator := validator.New(d, res, enforcer) 101 102 repoResolver := reporesolver.New(config, enforcer, d) 103 104 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 105 106 wrapper := db.DbWrapper{Execer: d} 107 jc, err := jetstream.NewJetstreamClient( 108 config.Jetstream.Endpoint, 109 "appview", 110 []string{ 111 tangled.GraphFollowNSID, 112 tangled.FeedStarNSID, 113 tangled.PublicKeyNSID, 114 tangled.RepoArtifactNSID, 115 tangled.ActorProfileNSID, 116 tangled.SpindleMemberNSID, 117 tangled.SpindleNSID, 118 tangled.StringNSID, 119 tangled.RepoIssueNSID, 120 tangled.RepoIssueCommentNSID, 121 tangled.LabelDefinitionNSID, 122 tangled.LabelOpNSID, 123 }, 124 nil, 125 tlog.SubLogger(logger, "jetstream"), 126 wrapper, 127 false, 128 129 // in-memory filter is inapplicalble to appview so 130 // we'll never log dids anyway. 131 false, 132 ) 133 if err != nil { 134 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 135 } 136 137 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 138 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 139 } 140 141 ingester := appview.Ingester{ 142 Db: wrapper, 143 Enforcer: enforcer, 144 IdResolver: res, 145 Config: config, 146 Logger: log.SubLogger(logger, "ingester"), 147 Validator: validator, 148 } 149 err = jc.StartJetstream(ctx, ingester.Ingest()) 150 if err != nil { 151 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 152 } 153 154 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 155 if err != nil { 156 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 157 } 158 knotstream.Start(ctx) 159 160 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 161 if err != nil { 162 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 163 } 164 spindlestream.Start(ctx) 165 166 var notifiers []notify.Notifier 167 168 // Always add the database notifier 169 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 170 171 // Add other notifiers in production only 172 if !config.Core.Dev { 173 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 174 } 175 notifiers = append(notifiers, indexer) 176 notifier := notify.NewMergedNotifier(notifiers, tlog.SubLogger(logger, "notify")) 177 178 state := &State{ 179 d, 180 notifier, 181 indexer, 182 oauth, 183 enforcer, 184 pages, 185 res, 186 mentionsResolver, 187 posthog, 188 jc, 189 config, 190 repoResolver, 191 knotstream, 192 spindlestream, 193 logger, 194 validator, 195 } 196 197 return state, nil 198} 199 200func (s *State) Close() error { 201 // other close up logic goes here 202 return s.db.Close() 203} 204 205func (s *State) Favicon(w http.ResponseWriter, r *http.Request) { 206 w.Header().Set("Content-Type", "image/svg+xml") 207 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year 208 w.Header().Set("ETag", `"favicon-svg-v1"`) 209 210 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` { 211 w.WriteHeader(http.StatusNotModified) 212 return 213 } 214 215 s.pages.Favicon(w) 216} 217 218func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 219 w.Header().Set("Content-Type", "text/plain") 220 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 221 222 robotsTxt := `User-agent: * 223Allow: / 224` 225 w.Write([]byte(robotsTxt)) 226} 227 228// https://developer.mozilla.org/en-US/docs/Web/Progressive_web_apps/Manifest 229const manifestJson = `{ 230 "name": "tangled", 231 "description": "tightly-knit social coding.", 232 "icons": [ 233 { 234 "src": "/favicon.svg", 235 "sizes": "144x144" 236 } 237 ], 238 "start_url": "/", 239 "id": "org.tangled", 240 241 "display": "standalone", 242 "background_color": "#111827", 243 "theme_color": "#111827" 244}` 245 246func (p *State) PWAManifest(w http.ResponseWriter, r *http.Request) { 247 w.Header().Set("Content-Type", "application/json") 248 w.Write([]byte(manifestJson)) 249} 250 251func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 252 user := s.oauth.GetUser(r) 253 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 254 LoggedInUser: user, 255 }) 256} 257 258func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 259 user := s.oauth.GetUser(r) 260 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 261 LoggedInUser: user, 262 }) 263} 264 265func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 266 user := s.oauth.GetUser(r) 267 s.pages.Brand(w, pages.BrandParams{ 268 LoggedInUser: user, 269 }) 270} 271 272func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) { 273 if s.oauth.GetUser(r) != nil { 274 s.Timeline(w, r) 275 return 276 } 277 s.Home(w, r) 278} 279 280func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 281 user := s.oauth.GetUser(r) 282 283 // TODO: set this flag based on the UI 284 filtered := false 285 286 var userDid string 287 if user != nil { 288 userDid = user.Did 289 } 290 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered) 291 if err != nil { 292 s.logger.Error("failed to make timeline", "err", err) 293 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 294 } 295 296 repos, err := db.GetTopStarredReposLastWeek(s.db) 297 if err != nil { 298 s.logger.Error("failed to get top starred repos", "err", err) 299 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 300 return 301 } 302 303 gfiLabel, err := db.GetLabelDefinition(s.db, orm.FilterEq("at_uri", s.config.Label.GoodFirstIssue)) 304 if err != nil { 305 // non-fatal 306 } 307 308 s.pages.Timeline(w, pages.TimelineParams{ 309 LoggedInUser: user, 310 Timeline: timeline, 311 Repos: repos, 312 GfiLabel: gfiLabel, 313 }) 314} 315 316func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 317 user := s.oauth.GetUser(r) 318 if user == nil { 319 return 320 } 321 322 l := s.logger.With("handler", "UpgradeBanner") 323 l = l.With("did", user.Did) 324 325 regs, err := db.GetRegistrations( 326 s.db, 327 orm.FilterEq("did", user.Did), 328 orm.FilterEq("needs_upgrade", 1), 329 ) 330 if err != nil { 331 l.Error("non-fatal: failed to get registrations", "err", err) 332 } 333 334 spindles, err := db.GetSpindles( 335 s.db, 336 orm.FilterEq("owner", user.Did), 337 orm.FilterEq("needs_upgrade", 1), 338 ) 339 if err != nil { 340 l.Error("non-fatal: failed to get spindles", "err", err) 341 } 342 343 if regs == nil && spindles == nil { 344 return 345 } 346 347 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 348 Registrations: regs, 349 Spindles: spindles, 350 }) 351} 352 353func (s *State) Home(w http.ResponseWriter, r *http.Request) { 354 // TODO: set this flag based on the UI 355 filtered := false 356 357 timeline, err := db.MakeTimeline(s.db, 5, "", filtered) 358 if err != nil { 359 s.logger.Error("failed to make timeline", "err", err) 360 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 361 return 362 } 363 364 repos, err := db.GetTopStarredReposLastWeek(s.db) 365 if err != nil { 366 s.logger.Error("failed to get top starred repos", "err", err) 367 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 368 return 369 } 370 371 s.pages.Home(w, pages.TimelineParams{ 372 LoggedInUser: nil, 373 Timeline: timeline, 374 Repos: repos, 375 }) 376} 377 378func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 379 user := chi.URLParam(r, "user") 380 user = strings.TrimPrefix(user, "@") 381 382 if user == "" { 383 w.WriteHeader(http.StatusBadRequest) 384 return 385 } 386 387 id, err := s.idResolver.ResolveIdent(r.Context(), user) 388 if err != nil { 389 w.WriteHeader(http.StatusInternalServerError) 390 return 391 } 392 393 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 394 if err != nil { 395 s.logger.Error("failed to get public keys", "err", err) 396 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 397 return 398 } 399 400 if len(pubKeys) == 0 { 401 w.WriteHeader(http.StatusNoContent) 402 return 403 } 404 405 for _, k := range pubKeys { 406 key := strings.TrimRight(k.Key, "\n") 407 fmt.Fprintln(w, key) 408 } 409} 410 411func validateRepoName(name string) error { 412 // check for path traversal attempts 413 if name == "." || name == ".." || 414 strings.Contains(name, "/") || strings.Contains(name, "\\") { 415 return fmt.Errorf("Repository name contains invalid path characters") 416 } 417 418 // check for sequences that could be used for traversal when normalized 419 if strings.Contains(name, "./") || strings.Contains(name, "../") || 420 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 421 return fmt.Errorf("Repository name contains invalid path sequence") 422 } 423 424 // then continue with character validation 425 for _, char := range name { 426 if !((char >= 'a' && char <= 'z') || 427 (char >= 'A' && char <= 'Z') || 428 (char >= '0' && char <= '9') || 429 char == '-' || char == '_' || char == '.') { 430 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 431 } 432 } 433 434 // additional check to prevent multiple sequential dots 435 if strings.Contains(name, "..") { 436 return fmt.Errorf("Repository name cannot contain sequential dots") 437 } 438 439 // if all checks pass 440 return nil 441} 442 443func stripGitExt(name string) string { 444 return strings.TrimSuffix(name, ".git") 445} 446 447func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 448 switch r.Method { 449 case http.MethodGet: 450 user := s.oauth.GetUser(r) 451 knots, err := s.enforcer.GetKnotsForUser(user.Did) 452 if err != nil { 453 s.pages.Notice(w, "repo", "Invalid user account.") 454 return 455 } 456 457 s.pages.NewRepo(w, pages.NewRepoParams{ 458 LoggedInUser: user, 459 Knots: knots, 460 }) 461 462 case http.MethodPost: 463 l := s.logger.With("handler", "NewRepo") 464 465 user := s.oauth.GetUser(r) 466 l = l.With("did", user.Did) 467 468 // form validation 469 domain := r.FormValue("domain") 470 if domain == "" { 471 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 472 return 473 } 474 l = l.With("knot", domain) 475 476 repoName := r.FormValue("name") 477 if repoName == "" { 478 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 479 return 480 } 481 482 if err := validateRepoName(repoName); err != nil { 483 s.pages.Notice(w, "repo", err.Error()) 484 return 485 } 486 repoName = stripGitExt(repoName) 487 l = l.With("repoName", repoName) 488 489 defaultBranch := r.FormValue("branch") 490 if defaultBranch == "" { 491 defaultBranch = "main" 492 } 493 l = l.With("defaultBranch", defaultBranch) 494 495 description := r.FormValue("description") 496 497 // ACL validation 498 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 499 if err != nil || !ok { 500 l.Info("unauthorized") 501 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 502 return 503 } 504 505 // Check for existing repos 506 existingRepo, err := db.GetRepo( 507 s.db, 508 orm.FilterEq("did", user.Did), 509 orm.FilterEq("name", repoName), 510 ) 511 if err == nil && existingRepo != nil { 512 l.Info("repo exists") 513 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 514 return 515 } 516 517 // create atproto record for this repo 518 rkey := tid.TID() 519 repo := &models.Repo{ 520 Did: user.Did, 521 Name: repoName, 522 Knot: domain, 523 Rkey: rkey, 524 Description: description, 525 Created: time.Now(), 526 Labels: s.config.Label.DefaultLabelDefs, 527 } 528 record := repo.AsRecord() 529 530 atpClient, err := s.oauth.AuthorizedClient(r) 531 if err != nil { 532 l.Info("PDS write failed", "err", err) 533 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 534 return 535 } 536 537 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 538 Collection: tangled.RepoNSID, 539 Repo: user.Did, 540 Rkey: rkey, 541 Record: &lexutil.LexiconTypeDecoder{ 542 Val: &record, 543 }, 544 }) 545 if err != nil { 546 l.Info("PDS write failed", "err", err) 547 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 548 return 549 } 550 551 aturi := atresp.Uri 552 l = l.With("aturi", aturi) 553 l.Info("wrote to PDS") 554 555 tx, err := s.db.BeginTx(r.Context(), nil) 556 if err != nil { 557 l.Info("txn failed", "err", err) 558 s.pages.Notice(w, "repo", "Failed to save repository information.") 559 return 560 } 561 562 // The rollback function reverts a few things on failure: 563 // - the pending txn 564 // - the ACLs 565 // - the atproto record created 566 rollback := func() { 567 err1 := tx.Rollback() 568 err2 := s.enforcer.E.LoadPolicy() 569 err3 := rollbackRecord(context.Background(), aturi, atpClient) 570 571 // ignore txn complete errors, this is okay 572 if errors.Is(err1, sql.ErrTxDone) { 573 err1 = nil 574 } 575 576 if errs := errors.Join(err1, err2, err3); errs != nil { 577 l.Error("failed to rollback changes", "errs", errs) 578 return 579 } 580 } 581 defer rollback() 582 583 client, err := s.oauth.ServiceClient( 584 r, 585 oauth.WithService(domain), 586 oauth.WithLxm(tangled.RepoCreateNSID), 587 oauth.WithDev(s.config.Core.Dev), 588 ) 589 if err != nil { 590 l.Error("service auth failed", "err", err) 591 s.pages.Notice(w, "repo", "Failed to reach PDS.") 592 return 593 } 594 595 xe := tangled.RepoCreate( 596 r.Context(), 597 client, 598 &tangled.RepoCreate_Input{ 599 Rkey: rkey, 600 }, 601 ) 602 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 603 l.Error("xrpc error", "xe", xe) 604 s.pages.Notice(w, "repo", err.Error()) 605 return 606 } 607 608 err = db.AddRepo(tx, repo) 609 if err != nil { 610 l.Error("db write failed", "err", err) 611 s.pages.Notice(w, "repo", "Failed to save repository information.") 612 return 613 } 614 615 // acls 616 p, _ := securejoin.SecureJoin(user.Did, repoName) 617 err = s.enforcer.AddRepo(user.Did, domain, p) 618 if err != nil { 619 l.Error("acl setup failed", "err", err) 620 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 621 return 622 } 623 624 err = tx.Commit() 625 if err != nil { 626 l.Error("txn commit failed", "err", err) 627 http.Error(w, err.Error(), http.StatusInternalServerError) 628 return 629 } 630 631 err = s.enforcer.E.SavePolicy() 632 if err != nil { 633 l.Error("acl save failed", "err", err) 634 http.Error(w, err.Error(), http.StatusInternalServerError) 635 return 636 } 637 638 // reset the ATURI because the transaction completed successfully 639 aturi = "" 640 641 s.notifier.NewRepo(r.Context(), repo) 642 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Did, repoName)) 643 } 644} 645 646// this is used to rollback changes made to the PDS 647// 648// it is a no-op if the provided ATURI is empty 649func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 650 if aturi == "" { 651 return nil 652 } 653 654 parsed := syntax.ATURI(aturi) 655 656 collection := parsed.Collection().String() 657 repo := parsed.Authority().String() 658 rkey := parsed.RecordKey().String() 659 660 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 661 Collection: collection, 662 Repo: repo, 663 Rkey: rkey, 664 }) 665 return err 666} 667 668func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 669 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 670 if err != nil { 671 return err 672 } 673 // already present 674 if len(defaultLabels) == len(defaults) { 675 return nil 676 } 677 678 labelDefs, err := models.FetchLabelDefs(r, defaults) 679 if err != nil { 680 return err 681 } 682 683 // Insert each label definition to the database 684 for _, labelDef := range labelDefs { 685 _, err = db.AddLabelDefinition(e, &labelDef) 686 if err != nil { 687 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 688 } 689 } 690 691 return nil 692}