forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/config" 16 "tangled.org/core/appview/db" 17 "tangled.org/core/appview/indexer" 18 "tangled.org/core/appview/models" 19 "tangled.org/core/appview/notify" 20 dbnotify "tangled.org/core/appview/notify/db" 21 phnotify "tangled.org/core/appview/notify/posthog" 22 "tangled.org/core/appview/oauth" 23 "tangled.org/core/appview/pages" 24 "tangled.org/core/appview/reporesolver" 25 "tangled.org/core/appview/validator" 26 xrpcclient "tangled.org/core/appview/xrpcclient" 27 "tangled.org/core/eventconsumer" 28 "tangled.org/core/idresolver" 29 "tangled.org/core/jetstream" 30 "tangled.org/core/log" 31 tlog "tangled.org/core/log" 32 "tangled.org/core/rbac" 33 "tangled.org/core/tid" 34 35 comatproto "github.com/bluesky-social/indigo/api/atproto" 36 atpclient "github.com/bluesky-social/indigo/atproto/client" 37 "github.com/bluesky-social/indigo/atproto/syntax" 38 lexutil "github.com/bluesky-social/indigo/lex/util" 39 securejoin "github.com/cyphar/filepath-securejoin" 40 "github.com/go-chi/chi/v5" 41 "github.com/posthog/posthog-go" 42) 43 44type State struct { 45 db *db.DB 46 notifier notify.Notifier 47 indexer *indexer.Indexer 48 oauth *oauth.OAuth 49 enforcer *rbac.Enforcer 50 pages *pages.Pages 51 idResolver *idresolver.Resolver 52 posthog posthog.Client 53 jc *jetstream.JetstreamClient 54 config *config.Config 55 repoResolver *reporesolver.RepoResolver 56 knotstream *eventconsumer.Consumer 57 spindlestream *eventconsumer.Consumer 58 logger *slog.Logger 59 validator *validator.Validator 60} 61 62func Make(ctx context.Context, config *config.Config) (*State, error) { 63 logger := tlog.FromContext(ctx) 64 65 d, err := db.Make(ctx, config.Core.DbPath) 66 if err != nil { 67 return nil, fmt.Errorf("failed to create db: %w", err) 68 } 69 70 indexer := indexer.New(log.SubLogger(logger, "indexer")) 71 err = indexer.Init(ctx, d) 72 if err != nil { 73 return nil, fmt.Errorf("failed to create indexer: %w", err) 74 } 75 76 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 77 if err != nil { 78 return nil, fmt.Errorf("failed to create enforcer: %w", err) 79 } 80 81 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 82 if err != nil { 83 logger.Error("failed to create redis resolver", "err", err) 84 res = idresolver.DefaultResolver(config.Plc.PLCURL) 85 } 86 87 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 88 if err != nil { 89 return nil, fmt.Errorf("failed to create posthog client: %w", err) 90 } 91 92 pages := pages.NewPages(config, res, log.SubLogger(logger, "pages")) 93 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 94 if err != nil { 95 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 96 } 97 validator := validator.New(d, res, enforcer) 98 99 repoResolver := reporesolver.New(config, enforcer, res, d) 100 101 wrapper := db.DbWrapper{Execer: d} 102 jc, err := jetstream.NewJetstreamClient( 103 config.Jetstream.Endpoint, 104 "appview", 105 []string{ 106 tangled.GraphFollowNSID, 107 tangled.FeedStarNSID, 108 tangled.PublicKeyNSID, 109 tangled.RepoArtifactNSID, 110 tangled.ActorProfileNSID, 111 tangled.SpindleMemberNSID, 112 tangled.SpindleNSID, 113 tangled.StringNSID, 114 tangled.RepoIssueNSID, 115 tangled.RepoIssueCommentNSID, 116 tangled.LabelDefinitionNSID, 117 tangled.LabelOpNSID, 118 }, 119 nil, 120 tlog.SubLogger(logger, "jetstream"), 121 wrapper, 122 false, 123 124 // in-memory filter is inapplicalble to appview so 125 // we'll never log dids anyway. 126 false, 127 ) 128 if err != nil { 129 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 130 } 131 132 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 133 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 134 } 135 136 ingester := appview.Ingester{ 137 Db: wrapper, 138 Enforcer: enforcer, 139 IdResolver: res, 140 Config: config, 141 Logger: log.SubLogger(logger, "ingester"), 142 Validator: validator, 143 } 144 err = jc.StartJetstream(ctx, ingester.Ingest()) 145 if err != nil { 146 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 147 } 148 149 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 150 if err != nil { 151 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 152 } 153 knotstream.Start(ctx) 154 155 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 156 if err != nil { 157 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 158 } 159 spindlestream.Start(ctx) 160 161 var notifiers []notify.Notifier 162 163 // Always add the database notifier 164 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 165 166 // Add other notifiers in production only 167 if !config.Core.Dev { 168 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 169 } 170 notifiers = append(notifiers, indexer) 171 notifier := notify.NewMergedNotifier(notifiers, tlog.SubLogger(logger, "notify")) 172 173 state := &State{ 174 d, 175 notifier, 176 indexer, 177 oauth, 178 enforcer, 179 pages, 180 res, 181 posthog, 182 jc, 183 config, 184 repoResolver, 185 knotstream, 186 spindlestream, 187 logger, 188 validator, 189 } 190 191 return state, nil 192} 193 194func (s *State) Close() error { 195 // other close up logic goes here 196 return s.db.Close() 197} 198 199func (s *State) Favicon(w http.ResponseWriter, r *http.Request) { 200 w.Header().Set("Content-Type", "image/svg+xml") 201 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year 202 w.Header().Set("ETag", `"favicon-svg-v1"`) 203 204 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` { 205 w.WriteHeader(http.StatusNotModified) 206 return 207 } 208 209 s.pages.Favicon(w) 210} 211 212func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 213 w.Header().Set("Content-Type", "text/plain") 214 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 215 216 robotsTxt := `User-agent: * 217Allow: / 218` 219 w.Write([]byte(robotsTxt)) 220} 221 222// https://developer.mozilla.org/en-US/docs/Web/Progressive_web_apps/Manifest 223const manifestJson = `{ 224 "name": "tangled", 225 "description": "tightly-knit social coding.", 226 "icons": [ 227 { 228 "src": "/favicon.svg", 229 "sizes": "144x144" 230 } 231 ], 232 "start_url": "/", 233 "id": "org.tangled", 234 235 "display": "standalone", 236 "background_color": "#111827", 237 "theme_color": "#111827" 238}` 239 240func (p *State) PWAManifest(w http.ResponseWriter, r *http.Request) { 241 w.Header().Set("Content-Type", "application/json") 242 w.Write([]byte(manifestJson)) 243} 244 245func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 246 user := s.oauth.GetUser(r) 247 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 248 LoggedInUser: user, 249 }) 250} 251 252func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 253 user := s.oauth.GetUser(r) 254 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 255 LoggedInUser: user, 256 }) 257} 258 259func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 260 user := s.oauth.GetUser(r) 261 s.pages.Brand(w, pages.BrandParams{ 262 LoggedInUser: user, 263 }) 264} 265 266func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) { 267 if s.oauth.GetUser(r) != nil { 268 s.Timeline(w, r) 269 return 270 } 271 s.Home(w, r) 272} 273 274func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 275 user := s.oauth.GetUser(r) 276 277 // TODO: set this flag based on the UI 278 filtered := false 279 280 var userDid string 281 if user != nil { 282 userDid = user.Did 283 } 284 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered) 285 if err != nil { 286 s.logger.Error("failed to make timeline", "err", err) 287 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 288 } 289 290 repos, err := db.GetTopStarredReposLastWeek(s.db) 291 if err != nil { 292 s.logger.Error("failed to get top starred repos", "err", err) 293 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 294 return 295 } 296 297 gfiLabel, err := db.GetLabelDefinition(s.db, db.FilterEq("at_uri", s.config.Label.GoodFirstIssue)) 298 if err != nil { 299 // non-fatal 300 } 301 302 s.pages.Timeline(w, pages.TimelineParams{ 303 LoggedInUser: user, 304 Timeline: timeline, 305 Repos: repos, 306 GfiLabel: gfiLabel, 307 }) 308} 309 310func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 311 user := s.oauth.GetUser(r) 312 if user == nil { 313 return 314 } 315 316 l := s.logger.With("handler", "UpgradeBanner") 317 l = l.With("did", user.Did) 318 319 regs, err := db.GetRegistrations( 320 s.db, 321 db.FilterEq("did", user.Did), 322 db.FilterEq("needs_upgrade", 1), 323 ) 324 if err != nil { 325 l.Error("non-fatal: failed to get registrations", "err", err) 326 } 327 328 spindles, err := db.GetSpindles( 329 s.db, 330 db.FilterEq("owner", user.Did), 331 db.FilterEq("needs_upgrade", 1), 332 ) 333 if err != nil { 334 l.Error("non-fatal: failed to get spindles", "err", err) 335 } 336 337 if regs == nil && spindles == nil { 338 return 339 } 340 341 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 342 Registrations: regs, 343 Spindles: spindles, 344 }) 345} 346 347func (s *State) Home(w http.ResponseWriter, r *http.Request) { 348 // TODO: set this flag based on the UI 349 filtered := false 350 351 timeline, err := db.MakeTimeline(s.db, 5, "", filtered) 352 if err != nil { 353 s.logger.Error("failed to make timeline", "err", err) 354 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 355 return 356 } 357 358 repos, err := db.GetTopStarredReposLastWeek(s.db) 359 if err != nil { 360 s.logger.Error("failed to get top starred repos", "err", err) 361 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 362 return 363 } 364 365 s.pages.Home(w, pages.TimelineParams{ 366 LoggedInUser: nil, 367 Timeline: timeline, 368 Repos: repos, 369 }) 370} 371 372func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 373 user := chi.URLParam(r, "user") 374 user = strings.TrimPrefix(user, "@") 375 376 if user == "" { 377 w.WriteHeader(http.StatusBadRequest) 378 return 379 } 380 381 id, err := s.idResolver.ResolveIdent(r.Context(), user) 382 if err != nil { 383 w.WriteHeader(http.StatusInternalServerError) 384 return 385 } 386 387 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 388 if err != nil { 389 s.logger.Error("failed to get public keys", "err", err) 390 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 391 return 392 } 393 394 if len(pubKeys) == 0 { 395 w.WriteHeader(http.StatusNoContent) 396 return 397 } 398 399 for _, k := range pubKeys { 400 key := strings.TrimRight(k.Key, "\n") 401 fmt.Fprintln(w, key) 402 } 403} 404 405func validateRepoName(name string) error { 406 // check for path traversal attempts 407 if name == "." || name == ".." || 408 strings.Contains(name, "/") || strings.Contains(name, "\\") { 409 return fmt.Errorf("Repository name contains invalid path characters") 410 } 411 412 // check for sequences that could be used for traversal when normalized 413 if strings.Contains(name, "./") || strings.Contains(name, "../") || 414 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 415 return fmt.Errorf("Repository name contains invalid path sequence") 416 } 417 418 // then continue with character validation 419 for _, char := range name { 420 if !((char >= 'a' && char <= 'z') || 421 (char >= 'A' && char <= 'Z') || 422 (char >= '0' && char <= '9') || 423 char == '-' || char == '_' || char == '.') { 424 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 425 } 426 } 427 428 // additional check to prevent multiple sequential dots 429 if strings.Contains(name, "..") { 430 return fmt.Errorf("Repository name cannot contain sequential dots") 431 } 432 433 // if all checks pass 434 return nil 435} 436 437func stripGitExt(name string) string { 438 return strings.TrimSuffix(name, ".git") 439} 440 441func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 442 switch r.Method { 443 case http.MethodGet: 444 user := s.oauth.GetUser(r) 445 knots, err := s.enforcer.GetKnotsForUser(user.Did) 446 if err != nil { 447 s.pages.Notice(w, "repo", "Invalid user account.") 448 return 449 } 450 451 s.pages.NewRepo(w, pages.NewRepoParams{ 452 LoggedInUser: user, 453 Knots: knots, 454 }) 455 456 case http.MethodPost: 457 l := s.logger.With("handler", "NewRepo") 458 459 user := s.oauth.GetUser(r) 460 l = l.With("did", user.Did) 461 462 // form validation 463 domain := r.FormValue("domain") 464 if domain == "" { 465 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 466 return 467 } 468 l = l.With("knot", domain) 469 470 repoName := r.FormValue("name") 471 if repoName == "" { 472 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 473 return 474 } 475 476 if err := validateRepoName(repoName); err != nil { 477 s.pages.Notice(w, "repo", err.Error()) 478 return 479 } 480 repoName = stripGitExt(repoName) 481 l = l.With("repoName", repoName) 482 483 defaultBranch := r.FormValue("branch") 484 if defaultBranch == "" { 485 defaultBranch = "main" 486 } 487 l = l.With("defaultBranch", defaultBranch) 488 489 description := r.FormValue("description") 490 491 // ACL validation 492 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 493 if err != nil || !ok { 494 l.Info("unauthorized") 495 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 496 return 497 } 498 499 // Check for existing repos 500 existingRepo, err := db.GetRepo( 501 s.db, 502 db.FilterEq("did", user.Did), 503 db.FilterEq("name", repoName), 504 ) 505 if err == nil && existingRepo != nil { 506 l.Info("repo exists") 507 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 508 return 509 } 510 511 // create atproto record for this repo 512 rkey := tid.TID() 513 repo := &models.Repo{ 514 Did: user.Did, 515 Name: repoName, 516 Knot: domain, 517 Rkey: rkey, 518 Description: description, 519 Created: time.Now(), 520 Labels: s.config.Label.DefaultLabelDefs, 521 } 522 record := repo.AsRecord() 523 524 atpClient, err := s.oauth.AuthorizedClient(r) 525 if err != nil { 526 l.Info("PDS write failed", "err", err) 527 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 528 return 529 } 530 531 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 532 Collection: tangled.RepoNSID, 533 Repo: user.Did, 534 Rkey: rkey, 535 Record: &lexutil.LexiconTypeDecoder{ 536 Val: &record, 537 }, 538 }) 539 if err != nil { 540 l.Info("PDS write failed", "err", err) 541 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 542 return 543 } 544 545 aturi := atresp.Uri 546 l = l.With("aturi", aturi) 547 l.Info("wrote to PDS") 548 549 tx, err := s.db.BeginTx(r.Context(), nil) 550 if err != nil { 551 l.Info("txn failed", "err", err) 552 s.pages.Notice(w, "repo", "Failed to save repository information.") 553 return 554 } 555 556 // The rollback function reverts a few things on failure: 557 // - the pending txn 558 // - the ACLs 559 // - the atproto record created 560 rollback := func() { 561 err1 := tx.Rollback() 562 err2 := s.enforcer.E.LoadPolicy() 563 err3 := rollbackRecord(context.Background(), aturi, atpClient) 564 565 // ignore txn complete errors, this is okay 566 if errors.Is(err1, sql.ErrTxDone) { 567 err1 = nil 568 } 569 570 if errs := errors.Join(err1, err2, err3); errs != nil { 571 l.Error("failed to rollback changes", "errs", errs) 572 return 573 } 574 } 575 defer rollback() 576 577 client, err := s.oauth.ServiceClient( 578 r, 579 oauth.WithService(domain), 580 oauth.WithLxm(tangled.RepoCreateNSID), 581 oauth.WithDev(s.config.Core.Dev), 582 ) 583 if err != nil { 584 l.Error("service auth failed", "err", err) 585 s.pages.Notice(w, "repo", "Failed to reach PDS.") 586 return 587 } 588 589 xe := tangled.RepoCreate( 590 r.Context(), 591 client, 592 &tangled.RepoCreate_Input{ 593 Rkey: rkey, 594 }, 595 ) 596 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 597 l.Error("xrpc error", "xe", xe) 598 s.pages.Notice(w, "repo", err.Error()) 599 return 600 } 601 602 err = db.AddRepo(tx, repo) 603 if err != nil { 604 l.Error("db write failed", "err", err) 605 s.pages.Notice(w, "repo", "Failed to save repository information.") 606 return 607 } 608 609 // acls 610 p, _ := securejoin.SecureJoin(user.Did, repoName) 611 err = s.enforcer.AddRepo(user.Did, domain, p) 612 if err != nil { 613 l.Error("acl setup failed", "err", err) 614 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 615 return 616 } 617 618 err = tx.Commit() 619 if err != nil { 620 l.Error("txn commit failed", "err", err) 621 http.Error(w, err.Error(), http.StatusInternalServerError) 622 return 623 } 624 625 err = s.enforcer.E.SavePolicy() 626 if err != nil { 627 l.Error("acl save failed", "err", err) 628 http.Error(w, err.Error(), http.StatusInternalServerError) 629 return 630 } 631 632 // reset the ATURI because the transaction completed successfully 633 aturi = "" 634 635 s.notifier.NewRepo(r.Context(), repo) 636 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Did, repoName)) 637 } 638} 639 640// this is used to rollback changes made to the PDS 641// 642// it is a no-op if the provided ATURI is empty 643func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 644 if aturi == "" { 645 return nil 646 } 647 648 parsed := syntax.ATURI(aturi) 649 650 collection := parsed.Collection().String() 651 repo := parsed.Authority().String() 652 rkey := parsed.RecordKey().String() 653 654 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 655 Collection: collection, 656 Repo: repo, 657 Rkey: rkey, 658 }) 659 return err 660} 661 662func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 663 defaultLabels, err := db.GetLabelDefinitions(e, db.FilterIn("at_uri", defaults)) 664 if err != nil { 665 return err 666 } 667 // already present 668 if len(defaultLabels) == len(defaults) { 669 return nil 670 } 671 672 labelDefs, err := models.FetchLabelDefs(r, defaults) 673 if err != nil { 674 return err 675 } 676 677 // Insert each label definition to the database 678 for _, labelDef := range labelDefs { 679 _, err = db.AddLabelDefinition(e, &labelDef) 680 if err != nil { 681 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 682 } 683 } 684 685 return nil 686}