forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/config" 16 "tangled.org/core/appview/db" 17 "tangled.org/core/appview/indexer" 18 "tangled.org/core/appview/models" 19 "tangled.org/core/appview/notify" 20 dbnotify "tangled.org/core/appview/notify/db" 21 phnotify "tangled.org/core/appview/notify/posthog" 22 "tangled.org/core/appview/oauth" 23 "tangled.org/core/appview/pages" 24 "tangled.org/core/appview/refresolver" 25 "tangled.org/core/appview/reporesolver" 26 "tangled.org/core/appview/validator" 27 xrpcclient "tangled.org/core/appview/xrpcclient" 28 "tangled.org/core/eventconsumer" 29 "tangled.org/core/idresolver" 30 "tangled.org/core/jetstream" 31 "tangled.org/core/log" 32 tlog "tangled.org/core/log" 33 "tangled.org/core/rbac" 34 "tangled.org/core/tid" 35 36 comatproto "github.com/bluesky-social/indigo/api/atproto" 37 atpclient "github.com/bluesky-social/indigo/atproto/client" 38 "github.com/bluesky-social/indigo/atproto/syntax" 39 lexutil "github.com/bluesky-social/indigo/lex/util" 40 securejoin "github.com/cyphar/filepath-securejoin" 41 "github.com/go-chi/chi/v5" 42 "github.com/posthog/posthog-go" 43) 44 45type State struct { 46 db *db.DB 47 notifier notify.Notifier 48 indexer *indexer.Indexer 49 oauth *oauth.OAuth 50 enforcer *rbac.Enforcer 51 pages *pages.Pages 52 idResolver *idresolver.Resolver 53 refResolver *refresolver.Resolver 54 posthog posthog.Client 55 jc *jetstream.JetstreamClient 56 config *config.Config 57 repoResolver *reporesolver.RepoResolver 58 knotstream *eventconsumer.Consumer 59 spindlestream *eventconsumer.Consumer 60 logger *slog.Logger 61 validator *validator.Validator 62} 63 64func Make(ctx context.Context, config *config.Config) (*State, error) { 65 logger := tlog.FromContext(ctx) 66 67 d, err := db.Make(ctx, config.Core.DbPath) 68 if err != nil { 69 return nil, fmt.Errorf("failed to create db: %w", err) 70 } 71 72 indexer := indexer.New(log.SubLogger(logger, "indexer")) 73 err = indexer.Init(ctx, d) 74 if err != nil { 75 return nil, fmt.Errorf("failed to create indexer: %w", err) 76 } 77 78 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 79 if err != nil { 80 return nil, fmt.Errorf("failed to create enforcer: %w", err) 81 } 82 83 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 84 if err != nil { 85 logger.Error("failed to create redis resolver", "err", err) 86 res = idresolver.DefaultResolver(config.Plc.PLCURL) 87 } 88 89 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 90 if err != nil { 91 return nil, fmt.Errorf("failed to create posthog client: %w", err) 92 } 93 94 pages := pages.NewPages(config, res, log.SubLogger(logger, "pages")) 95 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 96 if err != nil { 97 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 98 } 99 validator := validator.New(d, res, enforcer) 100 101 repoResolver := reporesolver.New(config, enforcer, res, d) 102 103 refResolver := refresolver.New(config, res, d, log.SubLogger(logger, "refResolver")) 104 105 wrapper := db.DbWrapper{Execer: d} 106 jc, err := jetstream.NewJetstreamClient( 107 config.Jetstream.Endpoint, 108 "appview", 109 []string{ 110 tangled.GraphFollowNSID, 111 tangled.FeedStarNSID, 112 tangled.PublicKeyNSID, 113 tangled.RepoArtifactNSID, 114 tangled.ActorProfileNSID, 115 tangled.SpindleMemberNSID, 116 tangled.SpindleNSID, 117 tangled.StringNSID, 118 tangled.RepoIssueNSID, 119 tangled.RepoIssueCommentNSID, 120 tangled.LabelDefinitionNSID, 121 tangled.LabelOpNSID, 122 }, 123 nil, 124 tlog.SubLogger(logger, "jetstream"), 125 wrapper, 126 false, 127 128 // in-memory filter is inapplicalble to appview so 129 // we'll never log dids anyway. 130 false, 131 ) 132 if err != nil { 133 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 134 } 135 136 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 137 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 138 } 139 140 ingester := appview.Ingester{ 141 Db: wrapper, 142 Enforcer: enforcer, 143 IdResolver: res, 144 Config: config, 145 Logger: log.SubLogger(logger, "ingester"), 146 Validator: validator, 147 } 148 err = jc.StartJetstream(ctx, ingester.Ingest()) 149 if err != nil { 150 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 151 } 152 153 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 154 if err != nil { 155 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 156 } 157 knotstream.Start(ctx) 158 159 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 160 if err != nil { 161 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 162 } 163 spindlestream.Start(ctx) 164 165 var notifiers []notify.Notifier 166 167 // Always add the database notifier 168 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 169 170 // Add other notifiers in production only 171 if !config.Core.Dev { 172 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 173 } 174 notifiers = append(notifiers, indexer) 175 notifier := notify.NewMergedNotifier(notifiers, tlog.SubLogger(logger, "notify")) 176 177 state := &State{ 178 d, 179 notifier, 180 indexer, 181 oauth, 182 enforcer, 183 pages, 184 res, 185 refResolver, 186 posthog, 187 jc, 188 config, 189 repoResolver, 190 knotstream, 191 spindlestream, 192 logger, 193 validator, 194 } 195 196 return state, nil 197} 198 199func (s *State) Close() error { 200 // other close up logic goes here 201 return s.db.Close() 202} 203 204func (s *State) Favicon(w http.ResponseWriter, r *http.Request) { 205 w.Header().Set("Content-Type", "image/svg+xml") 206 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year 207 w.Header().Set("ETag", `"favicon-svg-v1"`) 208 209 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` { 210 w.WriteHeader(http.StatusNotModified) 211 return 212 } 213 214 s.pages.Favicon(w) 215} 216 217func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 218 w.Header().Set("Content-Type", "text/plain") 219 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 220 221 robotsTxt := `User-agent: * 222Allow: / 223` 224 w.Write([]byte(robotsTxt)) 225} 226 227// https://developer.mozilla.org/en-US/docs/Web/Progressive_web_apps/Manifest 228const manifestJson = `{ 229 "name": "tangled", 230 "description": "tightly-knit social coding.", 231 "icons": [ 232 { 233 "src": "/favicon.svg", 234 "sizes": "144x144" 235 } 236 ], 237 "start_url": "/", 238 "id": "org.tangled", 239 240 "display": "standalone", 241 "background_color": "#111827", 242 "theme_color": "#111827" 243}` 244 245func (p *State) PWAManifest(w http.ResponseWriter, r *http.Request) { 246 w.Header().Set("Content-Type", "application/json") 247 w.Write([]byte(manifestJson)) 248} 249 250func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 251 user := s.oauth.GetUser(r) 252 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 253 LoggedInUser: user, 254 }) 255} 256 257func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 258 user := s.oauth.GetUser(r) 259 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 260 LoggedInUser: user, 261 }) 262} 263 264func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 265 user := s.oauth.GetUser(r) 266 s.pages.Brand(w, pages.BrandParams{ 267 LoggedInUser: user, 268 }) 269} 270 271func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) { 272 if s.oauth.GetUser(r) != nil { 273 s.Timeline(w, r) 274 return 275 } 276 s.Home(w, r) 277} 278 279func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 280 user := s.oauth.GetUser(r) 281 282 // TODO: set this flag based on the UI 283 filtered := false 284 285 var userDid string 286 if user != nil { 287 userDid = user.Did 288 } 289 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered) 290 if err != nil { 291 s.logger.Error("failed to make timeline", "err", err) 292 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 293 } 294 295 repos, err := db.GetTopStarredReposLastWeek(s.db) 296 if err != nil { 297 s.logger.Error("failed to get top starred repos", "err", err) 298 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 299 return 300 } 301 302 gfiLabel, err := db.GetLabelDefinition(s.db, db.FilterEq("at_uri", s.config.Label.GoodFirstIssue)) 303 if err != nil { 304 // non-fatal 305 } 306 307 s.pages.Timeline(w, pages.TimelineParams{ 308 LoggedInUser: user, 309 Timeline: timeline, 310 Repos: repos, 311 GfiLabel: gfiLabel, 312 }) 313} 314 315func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 316 user := s.oauth.GetUser(r) 317 if user == nil { 318 return 319 } 320 321 l := s.logger.With("handler", "UpgradeBanner") 322 l = l.With("did", user.Did) 323 324 regs, err := db.GetRegistrations( 325 s.db, 326 db.FilterEq("did", user.Did), 327 db.FilterEq("needs_upgrade", 1), 328 ) 329 if err != nil { 330 l.Error("non-fatal: failed to get registrations", "err", err) 331 } 332 333 spindles, err := db.GetSpindles( 334 s.db, 335 db.FilterEq("owner", user.Did), 336 db.FilterEq("needs_upgrade", 1), 337 ) 338 if err != nil { 339 l.Error("non-fatal: failed to get spindles", "err", err) 340 } 341 342 if regs == nil && spindles == nil { 343 return 344 } 345 346 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 347 Registrations: regs, 348 Spindles: spindles, 349 }) 350} 351 352func (s *State) Home(w http.ResponseWriter, r *http.Request) { 353 // TODO: set this flag based on the UI 354 filtered := false 355 356 timeline, err := db.MakeTimeline(s.db, 5, "", filtered) 357 if err != nil { 358 s.logger.Error("failed to make timeline", "err", err) 359 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 360 return 361 } 362 363 repos, err := db.GetTopStarredReposLastWeek(s.db) 364 if err != nil { 365 s.logger.Error("failed to get top starred repos", "err", err) 366 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 367 return 368 } 369 370 s.pages.Home(w, pages.TimelineParams{ 371 LoggedInUser: nil, 372 Timeline: timeline, 373 Repos: repos, 374 }) 375} 376 377func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 378 user := chi.URLParam(r, "user") 379 user = strings.TrimPrefix(user, "@") 380 381 if user == "" { 382 w.WriteHeader(http.StatusBadRequest) 383 return 384 } 385 386 id, err := s.idResolver.ResolveIdent(r.Context(), user) 387 if err != nil { 388 w.WriteHeader(http.StatusInternalServerError) 389 return 390 } 391 392 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 393 if err != nil { 394 s.logger.Error("failed to get public keys", "err", err) 395 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 396 return 397 } 398 399 if len(pubKeys) == 0 { 400 w.WriteHeader(http.StatusNoContent) 401 return 402 } 403 404 for _, k := range pubKeys { 405 key := strings.TrimRight(k.Key, "\n") 406 fmt.Fprintln(w, key) 407 } 408} 409 410func validateRepoName(name string) error { 411 // check for path traversal attempts 412 if name == "." || name == ".." || 413 strings.Contains(name, "/") || strings.Contains(name, "\\") { 414 return fmt.Errorf("Repository name contains invalid path characters") 415 } 416 417 // check for sequences that could be used for traversal when normalized 418 if strings.Contains(name, "./") || strings.Contains(name, "../") || 419 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 420 return fmt.Errorf("Repository name contains invalid path sequence") 421 } 422 423 // then continue with character validation 424 for _, char := range name { 425 if !((char >= 'a' && char <= 'z') || 426 (char >= 'A' && char <= 'Z') || 427 (char >= '0' && char <= '9') || 428 char == '-' || char == '_' || char == '.') { 429 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 430 } 431 } 432 433 // additional check to prevent multiple sequential dots 434 if strings.Contains(name, "..") { 435 return fmt.Errorf("Repository name cannot contain sequential dots") 436 } 437 438 // if all checks pass 439 return nil 440} 441 442func stripGitExt(name string) string { 443 return strings.TrimSuffix(name, ".git") 444} 445 446func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 447 switch r.Method { 448 case http.MethodGet: 449 user := s.oauth.GetUser(r) 450 knots, err := s.enforcer.GetKnotsForUser(user.Did) 451 if err != nil { 452 s.pages.Notice(w, "repo", "Invalid user account.") 453 return 454 } 455 456 s.pages.NewRepo(w, pages.NewRepoParams{ 457 LoggedInUser: user, 458 Knots: knots, 459 }) 460 461 case http.MethodPost: 462 l := s.logger.With("handler", "NewRepo") 463 464 user := s.oauth.GetUser(r) 465 l = l.With("did", user.Did) 466 467 // form validation 468 domain := r.FormValue("domain") 469 if domain == "" { 470 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 471 return 472 } 473 l = l.With("knot", domain) 474 475 repoName := r.FormValue("name") 476 if repoName == "" { 477 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 478 return 479 } 480 481 if err := validateRepoName(repoName); err != nil { 482 s.pages.Notice(w, "repo", err.Error()) 483 return 484 } 485 repoName = stripGitExt(repoName) 486 l = l.With("repoName", repoName) 487 488 defaultBranch := r.FormValue("branch") 489 if defaultBranch == "" { 490 defaultBranch = "main" 491 } 492 l = l.With("defaultBranch", defaultBranch) 493 494 description := r.FormValue("description") 495 496 // ACL validation 497 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 498 if err != nil || !ok { 499 l.Info("unauthorized") 500 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 501 return 502 } 503 504 // Check for existing repos 505 existingRepo, err := db.GetRepo( 506 s.db, 507 db.FilterEq("did", user.Did), 508 db.FilterEq("name", repoName), 509 ) 510 if err == nil && existingRepo != nil { 511 l.Info("repo exists") 512 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 513 return 514 } 515 516 // create atproto record for this repo 517 rkey := tid.TID() 518 repo := &models.Repo{ 519 Did: user.Did, 520 Name: repoName, 521 Knot: domain, 522 Rkey: rkey, 523 Description: description, 524 Created: time.Now(), 525 Labels: s.config.Label.DefaultLabelDefs, 526 } 527 record := repo.AsRecord() 528 529 atpClient, err := s.oauth.AuthorizedClient(r) 530 if err != nil { 531 l.Info("PDS write failed", "err", err) 532 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 533 return 534 } 535 536 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 537 Collection: tangled.RepoNSID, 538 Repo: user.Did, 539 Rkey: rkey, 540 Record: &lexutil.LexiconTypeDecoder{ 541 Val: &record, 542 }, 543 }) 544 if err != nil { 545 l.Info("PDS write failed", "err", err) 546 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 547 return 548 } 549 550 aturi := atresp.Uri 551 l = l.With("aturi", aturi) 552 l.Info("wrote to PDS") 553 554 tx, err := s.db.BeginTx(r.Context(), nil) 555 if err != nil { 556 l.Info("txn failed", "err", err) 557 s.pages.Notice(w, "repo", "Failed to save repository information.") 558 return 559 } 560 561 // The rollback function reverts a few things on failure: 562 // - the pending txn 563 // - the ACLs 564 // - the atproto record created 565 rollback := func() { 566 err1 := tx.Rollback() 567 err2 := s.enforcer.E.LoadPolicy() 568 err3 := rollbackRecord(context.Background(), aturi, atpClient) 569 570 // ignore txn complete errors, this is okay 571 if errors.Is(err1, sql.ErrTxDone) { 572 err1 = nil 573 } 574 575 if errs := errors.Join(err1, err2, err3); errs != nil { 576 l.Error("failed to rollback changes", "errs", errs) 577 return 578 } 579 } 580 defer rollback() 581 582 client, err := s.oauth.ServiceClient( 583 r, 584 oauth.WithService(domain), 585 oauth.WithLxm(tangled.RepoCreateNSID), 586 oauth.WithDev(s.config.Core.Dev), 587 ) 588 if err != nil { 589 l.Error("service auth failed", "err", err) 590 s.pages.Notice(w, "repo", "Failed to reach PDS.") 591 return 592 } 593 594 xe := tangled.RepoCreate( 595 r.Context(), 596 client, 597 &tangled.RepoCreate_Input{ 598 Rkey: rkey, 599 }, 600 ) 601 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 602 l.Error("xrpc error", "xe", xe) 603 s.pages.Notice(w, "repo", err.Error()) 604 return 605 } 606 607 err = db.AddRepo(tx, repo) 608 if err != nil { 609 l.Error("db write failed", "err", err) 610 s.pages.Notice(w, "repo", "Failed to save repository information.") 611 return 612 } 613 614 // acls 615 p, _ := securejoin.SecureJoin(user.Did, repoName) 616 err = s.enforcer.AddRepo(user.Did, domain, p) 617 if err != nil { 618 l.Error("acl setup failed", "err", err) 619 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 620 return 621 } 622 623 err = tx.Commit() 624 if err != nil { 625 l.Error("txn commit failed", "err", err) 626 http.Error(w, err.Error(), http.StatusInternalServerError) 627 return 628 } 629 630 err = s.enforcer.E.SavePolicy() 631 if err != nil { 632 l.Error("acl save failed", "err", err) 633 http.Error(w, err.Error(), http.StatusInternalServerError) 634 return 635 } 636 637 // reset the ATURI because the transaction completed successfully 638 aturi = "" 639 640 s.notifier.NewRepo(r.Context(), repo) 641 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Did, repoName)) 642 } 643} 644 645// this is used to rollback changes made to the PDS 646// 647// it is a no-op if the provided ATURI is empty 648func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 649 if aturi == "" { 650 return nil 651 } 652 653 parsed := syntax.ATURI(aturi) 654 655 collection := parsed.Collection().String() 656 repo := parsed.Authority().String() 657 rkey := parsed.RecordKey().String() 658 659 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 660 Collection: collection, 661 Repo: repo, 662 Rkey: rkey, 663 }) 664 return err 665} 666 667func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 668 defaultLabels, err := db.GetLabelDefinitions(e, db.FilterIn("at_uri", defaults)) 669 if err != nil { 670 return err 671 } 672 // already present 673 if len(defaultLabels) == len(defaults) { 674 return nil 675 } 676 677 labelDefs, err := models.FetchLabelDefs(r, defaults) 678 if err != nil { 679 return err 680 } 681 682 // Insert each label definition to the database 683 for _, labelDef := range labelDefs { 684 _, err = db.AddLabelDefinition(e, &labelDef) 685 if err != nil { 686 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 687 } 688 } 689 690 return nil 691}