forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/config" 16 "tangled.org/core/appview/db" 17 "tangled.org/core/appview/indexer" 18 "tangled.org/core/appview/models" 19 "tangled.org/core/appview/notify" 20 dbnotify "tangled.org/core/appview/notify/db" 21 phnotify "tangled.org/core/appview/notify/posthog" 22 "tangled.org/core/appview/oauth" 23 "tangled.org/core/appview/pages" 24 "tangled.org/core/appview/reporesolver" 25 "tangled.org/core/appview/validator" 26 xrpcclient "tangled.org/core/appview/xrpcclient" 27 "tangled.org/core/eventconsumer" 28 "tangled.org/core/idresolver" 29 "tangled.org/core/jetstream" 30 "tangled.org/core/log" 31 tlog "tangled.org/core/log" 32 "tangled.org/core/rbac" 33 "tangled.org/core/tid" 34 35 comatproto "github.com/bluesky-social/indigo/api/atproto" 36 atpclient "github.com/bluesky-social/indigo/atproto/client" 37 "github.com/bluesky-social/indigo/atproto/syntax" 38 lexutil "github.com/bluesky-social/indigo/lex/util" 39 securejoin "github.com/cyphar/filepath-securejoin" 40 "github.com/go-chi/chi/v5" 41 "github.com/posthog/posthog-go" 42) 43 44type State struct { 45 db *db.DB 46 notifier notify.Notifier 47 indexer *indexer.Indexer 48 oauth *oauth.OAuth 49 enforcer *rbac.Enforcer 50 pages *pages.Pages 51 idResolver *idresolver.Resolver 52 posthog posthog.Client 53 jc *jetstream.JetstreamClient 54 config *config.Config 55 repoResolver *reporesolver.RepoResolver 56 knotstream *eventconsumer.Consumer 57 spindlestream *eventconsumer.Consumer 58 logger *slog.Logger 59 validator *validator.Validator 60} 61 62func Make(ctx context.Context, config *config.Config) (*State, error) { 63 logger := tlog.FromContext(ctx) 64 65 d, err := db.Make(ctx, config.Core.DbPath) 66 if err != nil { 67 return nil, fmt.Errorf("failed to create db: %w", err) 68 } 69 70 indexer := indexer.New(log.SubLogger(logger, "indexer")) 71 err = indexer.Init(ctx, d) 72 if err != nil { 73 return nil, fmt.Errorf("failed to create indexer: %w", err) 74 } 75 76 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 77 if err != nil { 78 return nil, fmt.Errorf("failed to create enforcer: %w", err) 79 } 80 81 res, err := idresolver.RedisResolver(config.Redis.ToURL()) 82 if err != nil { 83 logger.Error("failed to create redis resolver", "err", err) 84 res = idresolver.DefaultResolver() 85 } 86 87 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 88 if err != nil { 89 return nil, fmt.Errorf("failed to create posthog client: %w", err) 90 } 91 92 pages := pages.NewPages(config, res, log.SubLogger(logger, "pages")) 93 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 94 if err != nil { 95 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 96 } 97 validator := validator.New(d, res, enforcer) 98 99 repoResolver := reporesolver.New(config, enforcer, res, d) 100 101 wrapper := db.DbWrapper{Execer: d} 102 jc, err := jetstream.NewJetstreamClient( 103 config.Jetstream.Endpoint, 104 "appview", 105 []string{ 106 tangled.GraphFollowNSID, 107 tangled.FeedStarNSID, 108 tangled.PublicKeyNSID, 109 tangled.RepoArtifactNSID, 110 tangled.ActorProfileNSID, 111 tangled.SpindleMemberNSID, 112 tangled.SpindleNSID, 113 tangled.StringNSID, 114 tangled.RepoIssueNSID, 115 tangled.RepoIssueCommentNSID, 116 tangled.LabelDefinitionNSID, 117 tangled.LabelOpNSID, 118 }, 119 nil, 120 tlog.SubLogger(logger, "jetstream"), 121 wrapper, 122 false, 123 124 // in-memory filter is inapplicalble to appview so 125 // we'll never log dids anyway. 126 false, 127 ) 128 if err != nil { 129 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 130 } 131 132 if err := BackfillDefaultDefs(d, res); err != nil { 133 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 134 } 135 136 ingester := appview.Ingester{ 137 Db: wrapper, 138 Enforcer: enforcer, 139 IdResolver: res, 140 Config: config, 141 Logger: log.SubLogger(logger, "ingester"), 142 Validator: validator, 143 } 144 err = jc.StartJetstream(ctx, ingester.Ingest()) 145 if err != nil { 146 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 147 } 148 149 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 150 if err != nil { 151 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 152 } 153 knotstream.Start(ctx) 154 155 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 156 if err != nil { 157 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 158 } 159 spindlestream.Start(ctx) 160 161 var notifiers []notify.Notifier 162 163 // Always add the database notifier 164 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 165 166 // Add other notifiers in production only 167 if !config.Core.Dev { 168 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 169 } 170 notifiers = append(notifiers, indexer) 171 notifier := notify.NewMergedNotifier(notifiers, tlog.SubLogger(logger, "notify")) 172 173 state := &State{ 174 d, 175 notifier, 176 indexer, 177 oauth, 178 enforcer, 179 pages, 180 res, 181 posthog, 182 jc, 183 config, 184 repoResolver, 185 knotstream, 186 spindlestream, 187 logger, 188 validator, 189 } 190 191 return state, nil 192} 193 194func (s *State) Close() error { 195 // other close up logic goes here 196 return s.db.Close() 197} 198 199func (s *State) Favicon(w http.ResponseWriter, r *http.Request) { 200 w.Header().Set("Content-Type", "image/svg+xml") 201 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year 202 w.Header().Set("ETag", `"favicon-svg-v1"`) 203 204 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` { 205 w.WriteHeader(http.StatusNotModified) 206 return 207 } 208 209 s.pages.Favicon(w) 210} 211 212func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 213 w.Header().Set("Content-Type", "text/plain") 214 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 215 216 robotsTxt := `User-agent: * 217Allow: / 218` 219 w.Write([]byte(robotsTxt)) 220} 221 222// https://developer.mozilla.org/en-US/docs/Web/Progressive_web_apps/Manifest 223const manifestJson = `{ 224 "name": "tangled", 225 "description": "tightly-knit social coding.", 226 "icons": [ 227 { 228 "src": "/favicon.svg", 229 "sizes": "144x144" 230 } 231 ], 232 "start_url": "/", 233 "id": "org.tangled", 234 235 "display": "standalone", 236 "background_color": "#111827", 237 "theme_color": "#111827" 238}` 239 240func (p *State) PWAManifest(w http.ResponseWriter, r *http.Request) { 241 w.Header().Set("Content-Type", "application/json") 242 w.Write([]byte(manifestJson)) 243} 244 245func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 246 user := s.oauth.GetUser(r) 247 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 248 LoggedInUser: user, 249 }) 250} 251 252func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 253 user := s.oauth.GetUser(r) 254 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 255 LoggedInUser: user, 256 }) 257} 258 259func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 260 user := s.oauth.GetUser(r) 261 s.pages.Brand(w, pages.BrandParams{ 262 LoggedInUser: user, 263 }) 264} 265 266func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) { 267 if s.oauth.GetUser(r) != nil { 268 s.Timeline(w, r) 269 return 270 } 271 s.Home(w, r) 272} 273 274func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 275 user := s.oauth.GetUser(r) 276 277 // TODO: set this flag based on the UI 278 filtered := false 279 280 var userDid string 281 if user != nil { 282 userDid = user.Did 283 } 284 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered) 285 if err != nil { 286 s.logger.Error("failed to make timeline", "err", err) 287 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 288 } 289 290 repos, err := db.GetTopStarredReposLastWeek(s.db) 291 if err != nil { 292 s.logger.Error("failed to get top starred repos", "err", err) 293 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 294 return 295 } 296 297 gfiLabel, err := db.GetLabelDefinition(s.db, db.FilterEq("at_uri", models.LabelGoodFirstIssue)) 298 if err != nil { 299 // non-fatal 300 } 301 302 s.pages.Timeline(w, pages.TimelineParams{ 303 LoggedInUser: user, 304 Timeline: timeline, 305 Repos: repos, 306 GfiLabel: gfiLabel, 307 }) 308} 309 310func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 311 user := s.oauth.GetUser(r) 312 if user == nil { 313 return 314 } 315 316 l := s.logger.With("handler", "UpgradeBanner") 317 l = l.With("did", user.Did) 318 319 regs, err := db.GetRegistrations( 320 s.db, 321 db.FilterEq("did", user.Did), 322 db.FilterEq("needs_upgrade", 1), 323 ) 324 if err != nil { 325 l.Error("non-fatal: failed to get registrations", "err", err) 326 } 327 328 spindles, err := db.GetSpindles( 329 s.db, 330 db.FilterEq("owner", user.Did), 331 db.FilterEq("needs_upgrade", 1), 332 ) 333 if err != nil { 334 l.Error("non-fatal: failed to get spindles", "err", err) 335 } 336 337 if regs == nil && spindles == nil { 338 return 339 } 340 341 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 342 Registrations: regs, 343 Spindles: spindles, 344 }) 345} 346 347func (s *State) Home(w http.ResponseWriter, r *http.Request) { 348 // TODO: set this flag based on the UI 349 filtered := false 350 351 timeline, err := db.MakeTimeline(s.db, 5, "", filtered) 352 if err != nil { 353 s.logger.Error("failed to make timeline", "err", err) 354 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 355 return 356 } 357 358 repos, err := db.GetTopStarredReposLastWeek(s.db) 359 if err != nil { 360 s.logger.Error("failed to get top starred repos", "err", err) 361 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 362 return 363 } 364 365 s.pages.Home(w, pages.TimelineParams{ 366 LoggedInUser: nil, 367 Timeline: timeline, 368 Repos: repos, 369 }) 370} 371 372func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 373 user := chi.URLParam(r, "user") 374 user = strings.TrimPrefix(user, "@") 375 376 if user == "" { 377 w.WriteHeader(http.StatusBadRequest) 378 return 379 } 380 381 id, err := s.idResolver.ResolveIdent(r.Context(), user) 382 if err != nil { 383 w.WriteHeader(http.StatusInternalServerError) 384 return 385 } 386 387 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 388 if err != nil { 389 w.WriteHeader(http.StatusNotFound) 390 return 391 } 392 393 if len(pubKeys) == 0 { 394 w.WriteHeader(http.StatusNotFound) 395 return 396 } 397 398 for _, k := range pubKeys { 399 key := strings.TrimRight(k.Key, "\n") 400 fmt.Fprintln(w, key) 401 } 402} 403 404func validateRepoName(name string) error { 405 // check for path traversal attempts 406 if name == "." || name == ".." || 407 strings.Contains(name, "/") || strings.Contains(name, "\\") { 408 return fmt.Errorf("Repository name contains invalid path characters") 409 } 410 411 // check for sequences that could be used for traversal when normalized 412 if strings.Contains(name, "./") || strings.Contains(name, "../") || 413 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 414 return fmt.Errorf("Repository name contains invalid path sequence") 415 } 416 417 // then continue with character validation 418 for _, char := range name { 419 if !((char >= 'a' && char <= 'z') || 420 (char >= 'A' && char <= 'Z') || 421 (char >= '0' && char <= '9') || 422 char == '-' || char == '_' || char == '.') { 423 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 424 } 425 } 426 427 // additional check to prevent multiple sequential dots 428 if strings.Contains(name, "..") { 429 return fmt.Errorf("Repository name cannot contain sequential dots") 430 } 431 432 // if all checks pass 433 return nil 434} 435 436func stripGitExt(name string) string { 437 return strings.TrimSuffix(name, ".git") 438} 439 440func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 441 switch r.Method { 442 case http.MethodGet: 443 user := s.oauth.GetUser(r) 444 knots, err := s.enforcer.GetKnotsForUser(user.Did) 445 if err != nil { 446 s.pages.Notice(w, "repo", "Invalid user account.") 447 return 448 } 449 450 s.pages.NewRepo(w, pages.NewRepoParams{ 451 LoggedInUser: user, 452 Knots: knots, 453 }) 454 455 case http.MethodPost: 456 l := s.logger.With("handler", "NewRepo") 457 458 user := s.oauth.GetUser(r) 459 l = l.With("did", user.Did) 460 461 // form validation 462 domain := r.FormValue("domain") 463 if domain == "" { 464 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 465 return 466 } 467 l = l.With("knot", domain) 468 469 repoName := r.FormValue("name") 470 if repoName == "" { 471 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 472 return 473 } 474 475 if err := validateRepoName(repoName); err != nil { 476 s.pages.Notice(w, "repo", err.Error()) 477 return 478 } 479 repoName = stripGitExt(repoName) 480 l = l.With("repoName", repoName) 481 482 defaultBranch := r.FormValue("branch") 483 if defaultBranch == "" { 484 defaultBranch = "main" 485 } 486 l = l.With("defaultBranch", defaultBranch) 487 488 description := r.FormValue("description") 489 490 // ACL validation 491 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 492 if err != nil || !ok { 493 l.Info("unauthorized") 494 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 495 return 496 } 497 498 // Check for existing repos 499 existingRepo, err := db.GetRepo( 500 s.db, 501 db.FilterEq("did", user.Did), 502 db.FilterEq("name", repoName), 503 ) 504 if err == nil && existingRepo != nil { 505 l.Info("repo exists") 506 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 507 return 508 } 509 510 // create atproto record for this repo 511 rkey := tid.TID() 512 repo := &models.Repo{ 513 Did: user.Did, 514 Name: repoName, 515 Knot: domain, 516 Rkey: rkey, 517 Description: description, 518 Created: time.Now(), 519 Labels: models.DefaultLabelDefs(), 520 } 521 record := repo.AsRecord() 522 523 atpClient, err := s.oauth.AuthorizedClient(r) 524 if err != nil { 525 l.Info("PDS write failed", "err", err) 526 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 527 return 528 } 529 530 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 531 Collection: tangled.RepoNSID, 532 Repo: user.Did, 533 Rkey: rkey, 534 Record: &lexutil.LexiconTypeDecoder{ 535 Val: &record, 536 }, 537 }) 538 if err != nil { 539 l.Info("PDS write failed", "err", err) 540 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 541 return 542 } 543 544 aturi := atresp.Uri 545 l = l.With("aturi", aturi) 546 l.Info("wrote to PDS") 547 548 tx, err := s.db.BeginTx(r.Context(), nil) 549 if err != nil { 550 l.Info("txn failed", "err", err) 551 s.pages.Notice(w, "repo", "Failed to save repository information.") 552 return 553 } 554 555 // The rollback function reverts a few things on failure: 556 // - the pending txn 557 // - the ACLs 558 // - the atproto record created 559 rollback := func() { 560 err1 := tx.Rollback() 561 err2 := s.enforcer.E.LoadPolicy() 562 err3 := rollbackRecord(context.Background(), aturi, atpClient) 563 564 // ignore txn complete errors, this is okay 565 if errors.Is(err1, sql.ErrTxDone) { 566 err1 = nil 567 } 568 569 if errs := errors.Join(err1, err2, err3); errs != nil { 570 l.Error("failed to rollback changes", "errs", errs) 571 return 572 } 573 } 574 defer rollback() 575 576 client, err := s.oauth.ServiceClient( 577 r, 578 oauth.WithService(domain), 579 oauth.WithLxm(tangled.RepoCreateNSID), 580 oauth.WithDev(s.config.Core.Dev), 581 ) 582 if err != nil { 583 l.Error("service auth failed", "err", err) 584 s.pages.Notice(w, "repo", "Failed to reach PDS.") 585 return 586 } 587 588 xe := tangled.RepoCreate( 589 r.Context(), 590 client, 591 &tangled.RepoCreate_Input{ 592 Rkey: rkey, 593 }, 594 ) 595 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 596 l.Error("xrpc error", "xe", xe) 597 s.pages.Notice(w, "repo", err.Error()) 598 return 599 } 600 601 err = db.AddRepo(tx, repo) 602 if err != nil { 603 l.Error("db write failed", "err", err) 604 s.pages.Notice(w, "repo", "Failed to save repository information.") 605 return 606 } 607 608 // acls 609 p, _ := securejoin.SecureJoin(user.Did, repoName) 610 err = s.enforcer.AddRepo(user.Did, domain, p) 611 if err != nil { 612 l.Error("acl setup failed", "err", err) 613 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 614 return 615 } 616 617 err = tx.Commit() 618 if err != nil { 619 l.Error("txn commit failed", "err", err) 620 http.Error(w, err.Error(), http.StatusInternalServerError) 621 return 622 } 623 624 err = s.enforcer.E.SavePolicy() 625 if err != nil { 626 l.Error("acl save failed", "err", err) 627 http.Error(w, err.Error(), http.StatusInternalServerError) 628 return 629 } 630 631 // reset the ATURI because the transaction completed successfully 632 aturi = "" 633 634 s.notifier.NewRepo(r.Context(), repo) 635 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Did, repoName)) 636 } 637} 638 639// this is used to rollback changes made to the PDS 640// 641// it is a no-op if the provided ATURI is empty 642func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 643 if aturi == "" { 644 return nil 645 } 646 647 parsed := syntax.ATURI(aturi) 648 649 collection := parsed.Collection().String() 650 repo := parsed.Authority().String() 651 rkey := parsed.RecordKey().String() 652 653 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 654 Collection: collection, 655 Repo: repo, 656 Rkey: rkey, 657 }) 658 return err 659} 660 661func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver) error { 662 defaults := models.DefaultLabelDefs() 663 defaultLabels, err := db.GetLabelDefinitions(e, db.FilterIn("at_uri", defaults)) 664 if err != nil { 665 return err 666 } 667 // already present 668 if len(defaultLabels) == len(defaults) { 669 return nil 670 } 671 672 labelDefs, err := models.FetchDefaultDefs(r) 673 if err != nil { 674 return err 675 } 676 677 // Insert each label definition to the database 678 for _, labelDef := range labelDefs { 679 _, err = db.AddLabelDefinition(e, &labelDef) 680 if err != nil { 681 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 682 } 683 } 684 685 return nil 686}