forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log" 9 "log/slog" 10 "net/http" 11 "strings" 12 "time" 13 14 "tangled.org/core/api/tangled" 15 "tangled.org/core/appview" 16 "tangled.org/core/appview/cache" 17 "tangled.org/core/appview/cache/session" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/notify" 22 dbnotify "tangled.org/core/appview/notify/db" 23 phnotify "tangled.org/core/appview/notify/posthog" 24 "tangled.org/core/appview/oauth" 25 "tangled.org/core/appview/pages" 26 "tangled.org/core/appview/reporesolver" 27 "tangled.org/core/appview/validator" 28 xrpcclient "tangled.org/core/appview/xrpcclient" 29 "tangled.org/core/eventconsumer" 30 "tangled.org/core/idresolver" 31 "tangled.org/core/jetstream" 32 tlog "tangled.org/core/log" 33 "tangled.org/core/rbac" 34 "tangled.org/core/tid" 35 36 comatproto "github.com/bluesky-social/indigo/api/atproto" 37 atpclient "github.com/bluesky-social/indigo/atproto/client" 38 "github.com/bluesky-social/indigo/atproto/syntax" 39 lexutil "github.com/bluesky-social/indigo/lex/util" 40 securejoin "github.com/cyphar/filepath-securejoin" 41 "github.com/go-chi/chi/v5" 42 "github.com/posthog/posthog-go" 43) 44 45type State struct { 46 db *db.DB 47 notifier notify.Notifier 48 oauth *oauth.OAuth 49 enforcer *rbac.Enforcer 50 pages *pages.Pages 51 sess *session.SessionStore 52 idResolver *idresolver.Resolver 53 posthog posthog.Client 54 jc *jetstream.JetstreamClient 55 config *config.Config 56 repoResolver *reporesolver.RepoResolver 57 knotstream *eventconsumer.Consumer 58 spindlestream *eventconsumer.Consumer 59 logger *slog.Logger 60 validator *validator.Validator 61} 62 63func Make(ctx context.Context, config *config.Config) (*State, error) { 64 d, err := db.Make(config.Core.DbPath) 65 if err != nil { 66 return nil, fmt.Errorf("failed to create db: %w", err) 67 } 68 69 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 70 if err != nil { 71 return nil, fmt.Errorf("failed to create enforcer: %w", err) 72 } 73 74 res, err := idresolver.RedisResolver(config.Redis.ToURL()) 75 if err != nil { 76 log.Printf("failed to create redis resolver: %v", err) 77 res = idresolver.DefaultResolver() 78 } 79 80 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 81 if err != nil { 82 return nil, fmt.Errorf("failed to create posthog client: %w", err) 83 } 84 85 pages := pages.NewPages(config, res) 86 cache := cache.New(config.Redis.Addr) 87 sess := session.New(cache) 88 oauth2, err := oauth.New(config, posthog, d, enforcer, res) 89 if err != nil { 90 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 91 } 92 validator := validator.New(d, res, enforcer) 93 94 repoResolver := reporesolver.New(config, enforcer, res, d) 95 96 wrapper := db.DbWrapper{Execer: d} 97 jc, err := jetstream.NewJetstreamClient( 98 config.Jetstream.Endpoint, 99 "appview", 100 []string{ 101 tangled.GraphFollowNSID, 102 tangled.FeedStarNSID, 103 tangled.PublicKeyNSID, 104 tangled.RepoArtifactNSID, 105 tangled.ActorProfileNSID, 106 tangled.SpindleMemberNSID, 107 tangled.SpindleNSID, 108 tangled.StringNSID, 109 tangled.RepoIssueNSID, 110 tangled.RepoIssueCommentNSID, 111 tangled.LabelDefinitionNSID, 112 tangled.LabelOpNSID, 113 }, 114 nil, 115 slog.Default(), 116 wrapper, 117 false, 118 119 // in-memory filter is inapplicalble to appview so 120 // we'll never log dids anyway. 121 false, 122 ) 123 if err != nil { 124 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 125 } 126 127 if err := BackfillDefaultDefs(d, res); err != nil { 128 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 129 } 130 131 ingester := appview.Ingester{ 132 Db: wrapper, 133 Enforcer: enforcer, 134 IdResolver: res, 135 Config: config, 136 Logger: tlog.New("ingester"), 137 Validator: validator, 138 } 139 err = jc.StartJetstream(ctx, ingester.Ingest()) 140 if err != nil { 141 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 142 } 143 144 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 145 if err != nil { 146 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 147 } 148 knotstream.Start(ctx) 149 150 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 151 if err != nil { 152 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 153 } 154 spindlestream.Start(ctx) 155 156 var notifiers []notify.Notifier 157 158 // Always add the database notifier 159 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 160 161 // Add other notifiers in production only 162 if !config.Core.Dev { 163 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 164 } 165 notifier := notify.NewMergedNotifier(notifiers...) 166 167 state := &State{ 168 d, 169 notifier, 170 oauth2, 171 enforcer, 172 pages, 173 sess, 174 res, 175 posthog, 176 jc, 177 config, 178 repoResolver, 179 knotstream, 180 spindlestream, 181 slog.Default(), 182 validator, 183 } 184 185 return state, nil 186} 187 188func (s *State) Close() error { 189 // other close up logic goes here 190 return s.db.Close() 191} 192 193func (s *State) Favicon(w http.ResponseWriter, r *http.Request) { 194 w.Header().Set("Content-Type", "image/svg+xml") 195 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year 196 w.Header().Set("ETag", `"favicon-svg-v1"`) 197 198 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` { 199 w.WriteHeader(http.StatusNotModified) 200 return 201 } 202 203 s.pages.Favicon(w) 204} 205 206func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 207 w.Header().Set("Content-Type", "text/plain") 208 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 209 210 robotsTxt := `User-agent: * 211Allow: / 212` 213 w.Write([]byte(robotsTxt)) 214} 215 216// https://developer.mozilla.org/en-US/docs/Web/Progressive_web_apps/Manifest 217const manifestJson = `{ 218 "name": "tangled", 219 "description": "tightly-knit social coding.", 220 "icons": [ 221 { 222 "src": "/favicon.svg", 223 "sizes": "144x144" 224 } 225 ], 226 "start_url": "/", 227 "id": "org.tangled", 228 229 "display": "standalone", 230 "background_color": "#111827", 231 "theme_color": "#111827" 232}` 233 234func (p *State) PWAManifest(w http.ResponseWriter, r *http.Request) { 235 w.Header().Set("Content-Type", "application/json") 236 w.Write([]byte(manifestJson)) 237} 238 239func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 240 user := s.oauth.GetUser(r) 241 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 242 LoggedInUser: user, 243 }) 244} 245 246func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 247 user := s.oauth.GetUser(r) 248 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 249 LoggedInUser: user, 250 }) 251} 252 253func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 254 user := s.oauth.GetUser(r) 255 s.pages.Brand(w, pages.BrandParams{ 256 LoggedInUser: user, 257 }) 258} 259 260func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) { 261 if s.oauth.GetUser(r) != nil { 262 s.Timeline(w, r) 263 return 264 } 265 s.Home(w, r) 266} 267 268func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 269 user := s.oauth.GetUser(r) 270 271 // TODO: set this flag based on the UI 272 filtered := false 273 274 var userDid string 275 if user != nil { 276 userDid = user.Did 277 } 278 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered) 279 if err != nil { 280 log.Println(err) 281 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 282 } 283 284 repos, err := db.GetTopStarredReposLastWeek(s.db) 285 if err != nil { 286 log.Println(err) 287 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 288 return 289 } 290 291 gfiLabel, err := db.GetLabelDefinition(s.db, db.FilterEq("at_uri", models.LabelGoodFirstIssue)) 292 if err != nil { 293 // non-fatal 294 } 295 296 s.pages.Timeline(w, pages.TimelineParams{ 297 LoggedInUser: user, 298 Timeline: timeline, 299 Repos: repos, 300 GfiLabel: gfiLabel, 301 }) 302} 303 304func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 305 user := s.oauth.GetUser(r) 306 if user == nil { 307 return 308 } 309 310 l := s.logger.With("handler", "UpgradeBanner") 311 l = l.With("did", user.Did) 312 313 regs, err := db.GetRegistrations( 314 s.db, 315 db.FilterEq("did", user.Did), 316 db.FilterEq("needs_upgrade", 1), 317 ) 318 if err != nil { 319 l.Error("non-fatal: failed to get registrations", "err", err) 320 } 321 322 spindles, err := db.GetSpindles( 323 s.db, 324 db.FilterEq("owner", user.Did), 325 db.FilterEq("needs_upgrade", 1), 326 ) 327 if err != nil { 328 l.Error("non-fatal: failed to get spindles", "err", err) 329 } 330 331 if regs == nil && spindles == nil { 332 return 333 } 334 335 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 336 Registrations: regs, 337 Spindles: spindles, 338 }) 339} 340 341func (s *State) Home(w http.ResponseWriter, r *http.Request) { 342 // TODO: set this flag based on the UI 343 filtered := false 344 345 timeline, err := db.MakeTimeline(s.db, 5, "", filtered) 346 if err != nil { 347 log.Println(err) 348 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 349 return 350 } 351 352 repos, err := db.GetTopStarredReposLastWeek(s.db) 353 if err != nil { 354 log.Println(err) 355 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 356 return 357 } 358 359 s.pages.Home(w, pages.TimelineParams{ 360 LoggedInUser: nil, 361 Timeline: timeline, 362 Repos: repos, 363 }) 364} 365 366func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 367 user := chi.URLParam(r, "user") 368 user = strings.TrimPrefix(user, "@") 369 370 if user == "" { 371 w.WriteHeader(http.StatusBadRequest) 372 return 373 } 374 375 id, err := s.idResolver.ResolveIdent(r.Context(), user) 376 if err != nil { 377 w.WriteHeader(http.StatusInternalServerError) 378 return 379 } 380 381 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 382 if err != nil { 383 w.WriteHeader(http.StatusNotFound) 384 return 385 } 386 387 if len(pubKeys) == 0 { 388 w.WriteHeader(http.StatusNotFound) 389 return 390 } 391 392 for _, k := range pubKeys { 393 key := strings.TrimRight(k.Key, "\n") 394 fmt.Fprintln(w, key) 395 } 396} 397 398func validateRepoName(name string) error { 399 // check for path traversal attempts 400 if name == "." || name == ".." || 401 strings.Contains(name, "/") || strings.Contains(name, "\\") { 402 return fmt.Errorf("Repository name contains invalid path characters") 403 } 404 405 // check for sequences that could be used for traversal when normalized 406 if strings.Contains(name, "./") || strings.Contains(name, "../") || 407 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 408 return fmt.Errorf("Repository name contains invalid path sequence") 409 } 410 411 // then continue with character validation 412 for _, char := range name { 413 if !((char >= 'a' && char <= 'z') || 414 (char >= 'A' && char <= 'Z') || 415 (char >= '0' && char <= '9') || 416 char == '-' || char == '_' || char == '.') { 417 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 418 } 419 } 420 421 // additional check to prevent multiple sequential dots 422 if strings.Contains(name, "..") { 423 return fmt.Errorf("Repository name cannot contain sequential dots") 424 } 425 426 // if all checks pass 427 return nil 428} 429 430func stripGitExt(name string) string { 431 return strings.TrimSuffix(name, ".git") 432} 433 434func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 435 switch r.Method { 436 case http.MethodGet: 437 user := s.oauth.GetUser(r) 438 knots, err := s.enforcer.GetKnotsForUser(user.Did) 439 if err != nil { 440 s.pages.Notice(w, "repo", "Invalid user account.") 441 return 442 } 443 444 s.pages.NewRepo(w, pages.NewRepoParams{ 445 LoggedInUser: user, 446 Knots: knots, 447 }) 448 449 case http.MethodPost: 450 l := s.logger.With("handler", "NewRepo") 451 452 user := s.oauth.GetUser(r) 453 l = l.With("did", user.Did) 454 455 // form validation 456 domain := r.FormValue("domain") 457 if domain == "" { 458 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 459 return 460 } 461 l = l.With("knot", domain) 462 463 repoName := r.FormValue("name") 464 if repoName == "" { 465 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 466 return 467 } 468 469 if err := validateRepoName(repoName); err != nil { 470 s.pages.Notice(w, "repo", err.Error()) 471 return 472 } 473 repoName = stripGitExt(repoName) 474 l = l.With("repoName", repoName) 475 476 defaultBranch := r.FormValue("branch") 477 if defaultBranch == "" { 478 defaultBranch = "main" 479 } 480 l = l.With("defaultBranch", defaultBranch) 481 482 description := r.FormValue("description") 483 484 // ACL validation 485 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 486 if err != nil || !ok { 487 l.Info("unauthorized") 488 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 489 return 490 } 491 492 // Check for existing repos 493 existingRepo, err := db.GetRepo( 494 s.db, 495 db.FilterEq("did", user.Did), 496 db.FilterEq("name", repoName), 497 ) 498 if err == nil && existingRepo != nil { 499 l.Info("repo exists") 500 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 501 return 502 } 503 504 // create atproto record for this repo 505 rkey := tid.TID() 506 repo := &models.Repo{ 507 Did: user.Did, 508 Name: repoName, 509 Knot: domain, 510 Rkey: rkey, 511 Description: description, 512 Created: time.Now(), 513 Labels: models.DefaultLabelDefs(), 514 } 515 record := repo.AsRecord() 516 517 atpClient, err := s.oauth.AuthorizedClient(r) 518 if err != nil { 519 l.Info("PDS write failed", "err", err) 520 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 521 return 522 } 523 524 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 525 Collection: tangled.RepoNSID, 526 Repo: user.Did, 527 Rkey: rkey, 528 Record: &lexutil.LexiconTypeDecoder{ 529 Val: &record, 530 }, 531 }) 532 if err != nil { 533 l.Info("PDS write failed", "err", err) 534 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 535 return 536 } 537 538 aturi := atresp.Uri 539 l = l.With("aturi", aturi) 540 l.Info("wrote to PDS") 541 542 tx, err := s.db.BeginTx(r.Context(), nil) 543 if err != nil { 544 l.Info("txn failed", "err", err) 545 s.pages.Notice(w, "repo", "Failed to save repository information.") 546 return 547 } 548 549 // The rollback function reverts a few things on failure: 550 // - the pending txn 551 // - the ACLs 552 // - the atproto record created 553 rollback := func() { 554 err1 := tx.Rollback() 555 err2 := s.enforcer.E.LoadPolicy() 556 err3 := rollbackRecord(context.Background(), aturi, atpClient) 557 558 // ignore txn complete errors, this is okay 559 if errors.Is(err1, sql.ErrTxDone) { 560 err1 = nil 561 } 562 563 if errs := errors.Join(err1, err2, err3); errs != nil { 564 l.Error("failed to rollback changes", "errs", errs) 565 return 566 } 567 } 568 defer rollback() 569 570 client, err := s.oauth.ServiceClient( 571 r, 572 oauth.WithService(domain), 573 oauth.WithLxm(tangled.RepoCreateNSID), 574 oauth.WithDev(s.config.Core.Dev), 575 ) 576 if err != nil { 577 l.Error("service auth failed", "err", err) 578 s.pages.Notice(w, "repo", "Failed to reach PDS.") 579 return 580 } 581 582 xe := tangled.RepoCreate( 583 r.Context(), 584 client, 585 &tangled.RepoCreate_Input{ 586 Rkey: rkey, 587 }, 588 ) 589 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 590 l.Error("xrpc error", "xe", xe) 591 s.pages.Notice(w, "repo", err.Error()) 592 return 593 } 594 595 err = db.AddRepo(tx, repo) 596 if err != nil { 597 l.Error("db write failed", "err", err) 598 s.pages.Notice(w, "repo", "Failed to save repository information.") 599 return 600 } 601 602 // acls 603 p, _ := securejoin.SecureJoin(user.Did, repoName) 604 err = s.enforcer.AddRepo(user.Did, domain, p) 605 if err != nil { 606 l.Error("acl setup failed", "err", err) 607 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 608 return 609 } 610 611 err = tx.Commit() 612 if err != nil { 613 l.Error("txn commit failed", "err", err) 614 http.Error(w, err.Error(), http.StatusInternalServerError) 615 return 616 } 617 618 err = s.enforcer.E.SavePolicy() 619 if err != nil { 620 l.Error("acl save failed", "err", err) 621 http.Error(w, err.Error(), http.StatusInternalServerError) 622 return 623 } 624 625 // reset the ATURI because the transaction completed successfully 626 aturi = "" 627 628 s.notifier.NewRepo(r.Context(), repo) 629 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Did, repoName)) 630 } 631} 632 633// this is used to rollback changes made to the PDS 634// 635// it is a no-op if the provided ATURI is empty 636func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 637 if aturi == "" { 638 return nil 639 } 640 641 parsed := syntax.ATURI(aturi) 642 643 collection := parsed.Collection().String() 644 repo := parsed.Authority().String() 645 rkey := parsed.RecordKey().String() 646 647 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 648 Collection: collection, 649 Repo: repo, 650 Rkey: rkey, 651 }) 652 return err 653} 654 655func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver) error { 656 defaults := models.DefaultLabelDefs() 657 defaultLabels, err := db.GetLabelDefinitions(e, db.FilterIn("at_uri", defaults)) 658 if err != nil { 659 return err 660 } 661 // already present 662 if len(defaultLabels) == len(defaults) { 663 return nil 664 } 665 666 labelDefs, err := models.FetchDefaultDefs(r) 667 if err != nil { 668 return err 669 } 670 671 // Insert each label definition to the database 672 for _, labelDef := range labelDefs { 673 _, err = db.AddLabelDefinition(e, &labelDef) 674 if err != nil { 675 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 676 } 677 } 678 679 return nil 680}