forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log" 9 "log/slog" 10 "net/http" 11 "strings" 12 "time" 13 14 "tangled.org/core/api/tangled" 15 "tangled.org/core/appview" 16 "tangled.org/core/appview/cache" 17 "tangled.org/core/appview/cache/session" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/notify" 22 dbnotify "tangled.org/core/appview/notify/db" 23 phnotify "tangled.org/core/appview/notify/posthog" 24 "tangled.org/core/appview/oauth" 25 "tangled.org/core/appview/pages" 26 "tangled.org/core/appview/reporesolver" 27 "tangled.org/core/appview/validator" 28 xrpcclient "tangled.org/core/appview/xrpcclient" 29 "tangled.org/core/eventconsumer" 30 "tangled.org/core/idresolver" 31 "tangled.org/core/jetstream" 32 tlog "tangled.org/core/log" 33 "tangled.org/core/rbac" 34 "tangled.org/core/tid" 35 36 comatproto "github.com/bluesky-social/indigo/api/atproto" 37 atpclient "github.com/bluesky-social/indigo/atproto/client" 38 "github.com/bluesky-social/indigo/atproto/syntax" 39 lexutil "github.com/bluesky-social/indigo/lex/util" 40 securejoin "github.com/cyphar/filepath-securejoin" 41 "github.com/go-chi/chi/v5" 42 "github.com/posthog/posthog-go" 43) 44 45type State struct { 46 db *db.DB 47 notifier notify.Notifier 48 oauth *oauth.OAuth 49 enforcer *rbac.Enforcer 50 pages *pages.Pages 51 sess *session.SessionStore 52 idResolver *idresolver.Resolver 53 posthog posthog.Client 54 jc *jetstream.JetstreamClient 55 config *config.Config 56 repoResolver *reporesolver.RepoResolver 57 knotstream *eventconsumer.Consumer 58 spindlestream *eventconsumer.Consumer 59 logger *slog.Logger 60 validator *validator.Validator 61} 62 63func Make(ctx context.Context, config *config.Config) (*State, error) { 64 d, err := db.Make(config.Core.DbPath) 65 if err != nil { 66 return nil, fmt.Errorf("failed to create db: %w", err) 67 } 68 69 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 70 if err != nil { 71 return nil, fmt.Errorf("failed to create enforcer: %w", err) 72 } 73 74 res, err := idresolver.RedisResolver(config.Redis.ToURL()) 75 if err != nil { 76 log.Printf("failed to create redis resolver: %v", err) 77 res = idresolver.DefaultResolver() 78 } 79 80 pages := pages.NewPages(config, res) 81 cache := cache.New(config.Redis.Addr) 82 sess := session.New(cache) 83 oauth2, err := oauth.New(config) 84 if err != nil { 85 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 86 } 87 validator := validator.New(d, res, enforcer) 88 89 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 90 if err != nil { 91 return nil, fmt.Errorf("failed to create posthog client: %w", err) 92 } 93 94 repoResolver := reporesolver.New(config, enforcer, res, d) 95 96 wrapper := db.DbWrapper{Execer: d} 97 jc, err := jetstream.NewJetstreamClient( 98 config.Jetstream.Endpoint, 99 "appview", 100 []string{ 101 tangled.GraphFollowNSID, 102 tangled.FeedStarNSID, 103 tangled.PublicKeyNSID, 104 tangled.RepoArtifactNSID, 105 tangled.ActorProfileNSID, 106 tangled.SpindleMemberNSID, 107 tangled.SpindleNSID, 108 tangled.StringNSID, 109 tangled.RepoIssueNSID, 110 tangled.RepoIssueCommentNSID, 111 tangled.LabelDefinitionNSID, 112 tangled.LabelOpNSID, 113 }, 114 nil, 115 slog.Default(), 116 wrapper, 117 false, 118 119 // in-memory filter is inapplicalble to appview so 120 // we'll never log dids anyway. 121 false, 122 ) 123 if err != nil { 124 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 125 } 126 127 if err := BackfillDefaultDefs(d, res); err != nil { 128 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 129 } 130 131 ingester := appview.Ingester{ 132 Db: wrapper, 133 Enforcer: enforcer, 134 IdResolver: res, 135 Config: config, 136 Logger: tlog.New("ingester"), 137 Validator: validator, 138 } 139 err = jc.StartJetstream(ctx, ingester.Ingest()) 140 if err != nil { 141 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 142 } 143 144 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 145 if err != nil { 146 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 147 } 148 knotstream.Start(ctx) 149 150 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 151 if err != nil { 152 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 153 } 154 spindlestream.Start(ctx) 155 156 var notifiers []notify.Notifier 157 158 // Always add the database notifier 159 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 160 161 // Add other notifiers in production only 162 if !config.Core.Dev { 163 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 164 } 165 notifier := notify.NewMergedNotifier(notifiers...) 166 167 state := &State{ 168 d, 169 notifier, 170 oauth2, 171 enforcer, 172 pages, 173 sess, 174 res, 175 posthog, 176 jc, 177 config, 178 repoResolver, 179 knotstream, 180 spindlestream, 181 slog.Default(), 182 validator, 183 } 184 185 return state, nil 186} 187 188func (s *State) Close() error { 189 // other close up logic goes here 190 return s.db.Close() 191} 192 193func (s *State) Favicon(w http.ResponseWriter, r *http.Request) { 194 w.Header().Set("Content-Type", "image/svg+xml") 195 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year 196 w.Header().Set("ETag", `"favicon-svg-v1"`) 197 198 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` { 199 w.WriteHeader(http.StatusNotModified) 200 return 201 } 202 203 s.pages.Favicon(w) 204} 205 206func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 207 w.Header().Set("Content-Type", "text/plain") 208 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 209 210 robotsTxt := `User-agent: * 211Allow: / 212` 213 w.Write([]byte(robotsTxt)) 214} 215 216// https://developer.mozilla.org/en-US/docs/Web/Progressive_web_apps/Manifest 217const manifestJson = `{ 218 "name": "tangled", 219 "description": "tightly-knit social coding.", 220 "icons": [ 221 { 222 "src": "/favicon.svg", 223 "sizes": "144x144" 224 } 225 ], 226 "start_url": "/", 227 "id": "org.tangled", 228 229 "display": "standalone", 230 "background_color": "#111827", 231 "theme_color": "#111827" 232}` 233 234func (p *State) PWAManifest(w http.ResponseWriter, r *http.Request) { 235 w.Header().Set("Content-Type", "application/json") 236 w.Write([]byte(manifestJson)) 237} 238 239func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 240 user := s.oauth.GetUser(r) 241 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 242 LoggedInUser: user, 243 }) 244} 245 246func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 247 user := s.oauth.GetUser(r) 248 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 249 LoggedInUser: user, 250 }) 251} 252 253func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 254 user := s.oauth.GetUser(r) 255 s.pages.Brand(w, pages.BrandParams{ 256 LoggedInUser: user, 257 }) 258} 259 260func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) { 261 if s.oauth.GetUser(r) != nil { 262 s.Timeline(w, r) 263 return 264 } 265 s.Home(w, r) 266} 267 268func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 269 user := s.oauth.GetUser(r) 270 271 var userDid string 272 if user != nil { 273 userDid = user.Did 274 } 275 timeline, err := db.MakeTimeline(s.db, 50, userDid) 276 if err != nil { 277 log.Println(err) 278 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 279 } 280 281 repos, err := db.GetTopStarredReposLastWeek(s.db) 282 if err != nil { 283 log.Println(err) 284 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 285 return 286 } 287 288 gfiLabel, err := db.GetLabelDefinition(s.db, db.FilterEq("at_uri", models.LabelGoodFirstIssue)) 289 if err != nil { 290 // non-fatal 291 } 292 293 s.pages.Timeline(w, pages.TimelineParams{ 294 LoggedInUser: user, 295 Timeline: timeline, 296 Repos: repos, 297 GfiLabel: gfiLabel, 298 }) 299} 300 301func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 302 user := s.oauth.GetUser(r) 303 if user == nil { 304 return 305 } 306 307 l := s.logger.With("handler", "UpgradeBanner") 308 l = l.With("did", user.Did) 309 310 regs, err := db.GetRegistrations( 311 s.db, 312 db.FilterEq("did", user.Did), 313 db.FilterEq("needs_upgrade", 1), 314 ) 315 if err != nil { 316 l.Error("non-fatal: failed to get registrations", "err", err) 317 } 318 319 spindles, err := db.GetSpindles( 320 s.db, 321 db.FilterEq("owner", user.Did), 322 db.FilterEq("needs_upgrade", 1), 323 ) 324 if err != nil { 325 l.Error("non-fatal: failed to get spindles", "err", err) 326 } 327 328 if regs == nil && spindles == nil { 329 return 330 } 331 332 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 333 Registrations: regs, 334 Spindles: spindles, 335 }) 336} 337 338func (s *State) Home(w http.ResponseWriter, r *http.Request) { 339 timeline, err := db.MakeTimeline(s.db, 5, "") 340 if err != nil { 341 log.Println(err) 342 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 343 return 344 } 345 346 repos, err := db.GetTopStarredReposLastWeek(s.db) 347 if err != nil { 348 log.Println(err) 349 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 350 return 351 } 352 353 s.pages.Home(w, pages.TimelineParams{ 354 LoggedInUser: nil, 355 Timeline: timeline, 356 Repos: repos, 357 }) 358} 359 360func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 361 user := chi.URLParam(r, "user") 362 user = strings.TrimPrefix(user, "@") 363 364 if user == "" { 365 w.WriteHeader(http.StatusBadRequest) 366 return 367 } 368 369 id, err := s.idResolver.ResolveIdent(r.Context(), user) 370 if err != nil { 371 w.WriteHeader(http.StatusInternalServerError) 372 return 373 } 374 375 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 376 if err != nil { 377 w.WriteHeader(http.StatusNotFound) 378 return 379 } 380 381 if len(pubKeys) == 0 { 382 w.WriteHeader(http.StatusNotFound) 383 return 384 } 385 386 for _, k := range pubKeys { 387 key := strings.TrimRight(k.Key, "\n") 388 fmt.Fprintln(w, key) 389 } 390} 391 392func validateRepoName(name string) error { 393 // check for path traversal attempts 394 if name == "." || name == ".." || 395 strings.Contains(name, "/") || strings.Contains(name, "\\") { 396 return fmt.Errorf("Repository name contains invalid path characters") 397 } 398 399 // check for sequences that could be used for traversal when normalized 400 if strings.Contains(name, "./") || strings.Contains(name, "../") || 401 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 402 return fmt.Errorf("Repository name contains invalid path sequence") 403 } 404 405 // then continue with character validation 406 for _, char := range name { 407 if !((char >= 'a' && char <= 'z') || 408 (char >= 'A' && char <= 'Z') || 409 (char >= '0' && char <= '9') || 410 char == '-' || char == '_' || char == '.') { 411 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 412 } 413 } 414 415 // additional check to prevent multiple sequential dots 416 if strings.Contains(name, "..") { 417 return fmt.Errorf("Repository name cannot contain sequential dots") 418 } 419 420 // if all checks pass 421 return nil 422} 423 424func stripGitExt(name string) string { 425 return strings.TrimSuffix(name, ".git") 426} 427 428func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 429 switch r.Method { 430 case http.MethodGet: 431 user := s.oauth.GetUser(r) 432 knots, err := s.enforcer.GetKnotsForUser(user.Did) 433 if err != nil { 434 s.pages.Notice(w, "repo", "Invalid user account.") 435 return 436 } 437 438 s.pages.NewRepo(w, pages.NewRepoParams{ 439 LoggedInUser: user, 440 Knots: knots, 441 }) 442 443 case http.MethodPost: 444 l := s.logger.With("handler", "NewRepo") 445 446 user := s.oauth.GetUser(r) 447 l = l.With("did", user.Did) 448 449 // form validation 450 domain := r.FormValue("domain") 451 if domain == "" { 452 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 453 return 454 } 455 l = l.With("knot", domain) 456 457 repoName := r.FormValue("name") 458 if repoName == "" { 459 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 460 return 461 } 462 463 if err := validateRepoName(repoName); err != nil { 464 s.pages.Notice(w, "repo", err.Error()) 465 return 466 } 467 repoName = stripGitExt(repoName) 468 l = l.With("repoName", repoName) 469 470 defaultBranch := r.FormValue("branch") 471 if defaultBranch == "" { 472 defaultBranch = "main" 473 } 474 l = l.With("defaultBranch", defaultBranch) 475 476 description := r.FormValue("description") 477 478 // ACL validation 479 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 480 if err != nil || !ok { 481 l.Info("unauthorized") 482 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 483 return 484 } 485 486 // Check for existing repos 487 existingRepo, err := db.GetRepo( 488 s.db, 489 db.FilterEq("did", user.Did), 490 db.FilterEq("name", repoName), 491 ) 492 if err == nil && existingRepo != nil { 493 l.Info("repo exists") 494 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 495 return 496 } 497 498 // create atproto record for this repo 499 rkey := tid.TID() 500 repo := &models.Repo{ 501 Did: user.Did, 502 Name: repoName, 503 Knot: domain, 504 Rkey: rkey, 505 Description: description, 506 Created: time.Now(), 507 Labels: models.DefaultLabelDefs(), 508 } 509 record := repo.AsRecord() 510 511 atpClient, err := s.oauth.AuthorizedClient(r) 512 if err != nil { 513 l.Info("PDS write failed", "err", err) 514 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 515 return 516 } 517 518 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 519 Collection: tangled.RepoNSID, 520 Repo: user.Did, 521 Rkey: rkey, 522 Record: &lexutil.LexiconTypeDecoder{ 523 Val: &record, 524 }, 525 }) 526 if err != nil { 527 l.Info("PDS write failed", "err", err) 528 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 529 return 530 } 531 532 aturi := atresp.Uri 533 l = l.With("aturi", aturi) 534 l.Info("wrote to PDS") 535 536 tx, err := s.db.BeginTx(r.Context(), nil) 537 if err != nil { 538 l.Info("txn failed", "err", err) 539 s.pages.Notice(w, "repo", "Failed to save repository information.") 540 return 541 } 542 543 // The rollback function reverts a few things on failure: 544 // - the pending txn 545 // - the ACLs 546 // - the atproto record created 547 rollback := func() { 548 err1 := tx.Rollback() 549 err2 := s.enforcer.E.LoadPolicy() 550 err3 := rollbackRecord(context.Background(), aturi, atpClient) 551 552 // ignore txn complete errors, this is okay 553 if errors.Is(err1, sql.ErrTxDone) { 554 err1 = nil 555 } 556 557 if errs := errors.Join(err1, err2, err3); errs != nil { 558 l.Error("failed to rollback changes", "errs", errs) 559 return 560 } 561 } 562 defer rollback() 563 564 client, err := s.oauth.ServiceClient( 565 r, 566 oauth.WithService(domain), 567 oauth.WithLxm(tangled.RepoCreateNSID), 568 oauth.WithDev(s.config.Core.Dev), 569 ) 570 if err != nil { 571 l.Error("service auth failed", "err", err) 572 s.pages.Notice(w, "repo", "Failed to reach PDS.") 573 return 574 } 575 576 xe := tangled.RepoCreate( 577 r.Context(), 578 client, 579 &tangled.RepoCreate_Input{ 580 Rkey: rkey, 581 }, 582 ) 583 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 584 l.Error("xrpc error", "xe", xe) 585 s.pages.Notice(w, "repo", err.Error()) 586 return 587 } 588 589 err = db.AddRepo(tx, repo) 590 if err != nil { 591 l.Error("db write failed", "err", err) 592 s.pages.Notice(w, "repo", "Failed to save repository information.") 593 return 594 } 595 596 // acls 597 p, _ := securejoin.SecureJoin(user.Did, repoName) 598 err = s.enforcer.AddRepo(user.Did, domain, p) 599 if err != nil { 600 l.Error("acl setup failed", "err", err) 601 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 602 return 603 } 604 605 err = tx.Commit() 606 if err != nil { 607 l.Error("txn commit failed", "err", err) 608 http.Error(w, err.Error(), http.StatusInternalServerError) 609 return 610 } 611 612 err = s.enforcer.E.SavePolicy() 613 if err != nil { 614 l.Error("acl save failed", "err", err) 615 http.Error(w, err.Error(), http.StatusInternalServerError) 616 return 617 } 618 619 // reset the ATURI because the transaction completed successfully 620 aturi = "" 621 622 s.notifier.NewRepo(r.Context(), repo) 623 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Did, repoName)) 624 } 625} 626 627// this is used to rollback changes made to the PDS 628// 629// it is a no-op if the provided ATURI is empty 630func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 631 if aturi == "" { 632 return nil 633 } 634 635 parsed := syntax.ATURI(aturi) 636 637 collection := parsed.Collection().String() 638 repo := parsed.Authority().String() 639 rkey := parsed.RecordKey().String() 640 641 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 642 Collection: collection, 643 Repo: repo, 644 Rkey: rkey, 645 }) 646 return err 647} 648 649func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver) error { 650 defaults := models.DefaultLabelDefs() 651 defaultLabels, err := db.GetLabelDefinitions(e, db.FilterIn("at_uri", defaults)) 652 if err != nil { 653 return err 654 } 655 // already present 656 if len(defaultLabels) == len(defaults) { 657 return nil 658 } 659 660 labelDefs, err := models.FetchDefaultDefs(r) 661 if err != nil { 662 return err 663 } 664 665 // Insert each label definition to the database 666 for _, labelDef := range labelDefs { 667 _, err = db.AddLabelDefinition(e, &labelDef) 668 if err != nil { 669 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 670 } 671 } 672 673 return nil 674}