forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log" 9 "log/slog" 10 "net/http" 11 "strings" 12 "time" 13 14 "tangled.org/core/api/tangled" 15 "tangled.org/core/appview" 16 "tangled.org/core/appview/cache" 17 "tangled.org/core/appview/cache/session" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/notify" 22 dbnotify "tangled.org/core/appview/notify/db" 23 phnotify "tangled.org/core/appview/notify/posthog" 24 "tangled.org/core/appview/oauth" 25 "tangled.org/core/appview/pages" 26 "tangled.org/core/appview/reporesolver" 27 "tangled.org/core/appview/validator" 28 xrpcclient "tangled.org/core/appview/xrpcclient" 29 "tangled.org/core/eventconsumer" 30 "tangled.org/core/idresolver" 31 "tangled.org/core/jetstream" 32 tlog "tangled.org/core/log" 33 "tangled.org/core/rbac" 34 "tangled.org/core/tid" 35 36 comatproto "github.com/bluesky-social/indigo/api/atproto" 37 atpclient "github.com/bluesky-social/indigo/atproto/client" 38 "github.com/bluesky-social/indigo/atproto/syntax" 39 lexutil "github.com/bluesky-social/indigo/lex/util" 40 securejoin "github.com/cyphar/filepath-securejoin" 41 "github.com/go-chi/chi/v5" 42 "github.com/posthog/posthog-go" 43) 44 45type State struct { 46 db *db.DB 47 notifier notify.Notifier 48 oauth *oauth.OAuth 49 enforcer *rbac.Enforcer 50 pages *pages.Pages 51 sess *session.SessionStore 52 idResolver *idresolver.Resolver 53 posthog posthog.Client 54 jc *jetstream.JetstreamClient 55 config *config.Config 56 repoResolver *reporesolver.RepoResolver 57 knotstream *eventconsumer.Consumer 58 spindlestream *eventconsumer.Consumer 59 logger *slog.Logger 60 validator *validator.Validator 61} 62 63func Make(ctx context.Context, config *config.Config) (*State, error) { 64 d, err := db.Make(config.Core.DbPath) 65 if err != nil { 66 return nil, fmt.Errorf("failed to create db: %w", err) 67 } 68 69 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 70 if err != nil { 71 return nil, fmt.Errorf("failed to create enforcer: %w", err) 72 } 73 74 res, err := idresolver.RedisResolver(config.Redis.ToURL()) 75 if err != nil { 76 log.Printf("failed to create redis resolver: %v", err) 77 res = idresolver.DefaultResolver() 78 } 79 80 pages := pages.NewPages(config, res) 81 cache := cache.New(config.Redis.Addr) 82 sess := session.New(cache) 83 oauth2, err := oauth.New(config) 84 if err != nil { 85 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 86 } 87 validator := validator.New(d, res, enforcer) 88 89 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 90 if err != nil { 91 return nil, fmt.Errorf("failed to create posthog client: %w", err) 92 } 93 94 repoResolver := reporesolver.New(config, enforcer, res, d) 95 96 wrapper := db.DbWrapper{Execer: d} 97 jc, err := jetstream.NewJetstreamClient( 98 config.Jetstream.Endpoint, 99 "appview", 100 []string{ 101 tangled.GraphFollowNSID, 102 tangled.FeedStarNSID, 103 tangled.PublicKeyNSID, 104 tangled.RepoArtifactNSID, 105 tangled.ActorProfileNSID, 106 tangled.SpindleMemberNSID, 107 tangled.SpindleNSID, 108 tangled.StringNSID, 109 tangled.RepoIssueNSID, 110 tangled.RepoIssueCommentNSID, 111 tangled.LabelDefinitionNSID, 112 tangled.LabelOpNSID, 113 }, 114 nil, 115 slog.Default(), 116 wrapper, 117 false, 118 119 // in-memory filter is inapplicalble to appview so 120 // we'll never log dids anyway. 121 false, 122 ) 123 if err != nil { 124 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 125 } 126 127 if err := BackfillDefaultDefs(d, res); err != nil { 128 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 129 } 130 131 ingester := appview.Ingester{ 132 Db: wrapper, 133 Enforcer: enforcer, 134 IdResolver: res, 135 Config: config, 136 Logger: tlog.New("ingester"), 137 Validator: validator, 138 } 139 err = jc.StartJetstream(ctx, ingester.Ingest()) 140 if err != nil { 141 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 142 } 143 144 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 145 if err != nil { 146 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 147 } 148 knotstream.Start(ctx) 149 150 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 151 if err != nil { 152 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 153 } 154 spindlestream.Start(ctx) 155 156 var notifiers []notify.Notifier 157 158 // Always add the database notifier 159 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 160 161 // Add other notifiers in production only 162 if !config.Core.Dev { 163 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 164 } 165 notifier := notify.NewMergedNotifier(notifiers...) 166 167 state := &State{ 168 d, 169 notifier, 170 oauth2, 171 enforcer, 172 pages, 173 sess, 174 res, 175 posthog, 176 jc, 177 config, 178 repoResolver, 179 knotstream, 180 spindlestream, 181 slog.Default(), 182 validator, 183 } 184 185 return state, nil 186} 187 188func (s *State) Close() error { 189 // other close up logic goes here 190 return s.db.Close() 191} 192 193func (s *State) Favicon(w http.ResponseWriter, r *http.Request) { 194 w.Header().Set("Content-Type", "image/svg+xml") 195 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year 196 w.Header().Set("ETag", `"favicon-svg-v1"`) 197 198 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` { 199 w.WriteHeader(http.StatusNotModified) 200 return 201 } 202 203 s.pages.Favicon(w) 204} 205 206// https://developer.mozilla.org/en-US/docs/Web/Progressive_web_apps/Manifest 207const manifestJson = `{ 208 "name": "tangled", 209 "description": "tightly-knit social coding.", 210 "icons": [ 211 { 212 "src": "/favicon.svg", 213 "sizes": "144x144" 214 } 215 ], 216 "start_url": "/", 217 "id": "org.tangled", 218 219 "display": "standalone", 220 "background_color": "#111827", 221 "theme_color": "#111827" 222}` 223 224func (p *State) PWAManifest(w http.ResponseWriter, r *http.Request) { 225 w.Header().Set("Content-Type", "application/json") 226 w.Write([]byte(manifestJson)) 227} 228 229func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 230 user := s.oauth.GetUser(r) 231 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 232 LoggedInUser: user, 233 }) 234} 235 236func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 237 user := s.oauth.GetUser(r) 238 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 239 LoggedInUser: user, 240 }) 241} 242 243func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 244 user := s.oauth.GetUser(r) 245 s.pages.Brand(w, pages.BrandParams{ 246 LoggedInUser: user, 247 }) 248} 249 250func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) { 251 if s.oauth.GetUser(r) != nil { 252 s.Timeline(w, r) 253 return 254 } 255 s.Home(w, r) 256} 257 258func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 259 user := s.oauth.GetUser(r) 260 261 var userDid string 262 if user != nil { 263 userDid = user.Did 264 } 265 timeline, err := db.MakeTimeline(s.db, 50, userDid) 266 if err != nil { 267 log.Println(err) 268 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 269 } 270 271 repos, err := db.GetTopStarredReposLastWeek(s.db) 272 if err != nil { 273 log.Println(err) 274 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 275 return 276 } 277 278 gfiLabel, err := db.GetLabelDefinition(s.db, db.FilterEq("at_uri", models.LabelGoodFirstIssue)) 279 if err != nil { 280 // non-fatal 281 } 282 283 s.pages.Timeline(w, pages.TimelineParams{ 284 LoggedInUser: user, 285 Timeline: timeline, 286 Repos: repos, 287 GfiLabel: gfiLabel, 288 }) 289} 290 291func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 292 user := s.oauth.GetUser(r) 293 if user == nil { 294 return 295 } 296 297 l := s.logger.With("handler", "UpgradeBanner") 298 l = l.With("did", user.Did) 299 300 regs, err := db.GetRegistrations( 301 s.db, 302 db.FilterEq("did", user.Did), 303 db.FilterEq("needs_upgrade", 1), 304 ) 305 if err != nil { 306 l.Error("non-fatal: failed to get registrations", "err", err) 307 } 308 309 spindles, err := db.GetSpindles( 310 s.db, 311 db.FilterEq("owner", user.Did), 312 db.FilterEq("needs_upgrade", 1), 313 ) 314 if err != nil { 315 l.Error("non-fatal: failed to get spindles", "err", err) 316 } 317 318 if regs == nil && spindles == nil { 319 return 320 } 321 322 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 323 Registrations: regs, 324 Spindles: spindles, 325 }) 326} 327 328func (s *State) Home(w http.ResponseWriter, r *http.Request) { 329 timeline, err := db.MakeTimeline(s.db, 5, "") 330 if err != nil { 331 log.Println(err) 332 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 333 return 334 } 335 336 repos, err := db.GetTopStarredReposLastWeek(s.db) 337 if err != nil { 338 log.Println(err) 339 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 340 return 341 } 342 343 s.pages.Home(w, pages.TimelineParams{ 344 LoggedInUser: nil, 345 Timeline: timeline, 346 Repos: repos, 347 }) 348} 349 350func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 351 user := chi.URLParam(r, "user") 352 user = strings.TrimPrefix(user, "@") 353 354 if user == "" { 355 w.WriteHeader(http.StatusBadRequest) 356 return 357 } 358 359 id, err := s.idResolver.ResolveIdent(r.Context(), user) 360 if err != nil { 361 w.WriteHeader(http.StatusInternalServerError) 362 return 363 } 364 365 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 366 if err != nil { 367 w.WriteHeader(http.StatusNotFound) 368 return 369 } 370 371 if len(pubKeys) == 0 { 372 w.WriteHeader(http.StatusNotFound) 373 return 374 } 375 376 for _, k := range pubKeys { 377 key := strings.TrimRight(k.Key, "\n") 378 fmt.Fprintln(w, key) 379 } 380} 381 382func validateRepoName(name string) error { 383 // check for path traversal attempts 384 if name == "." || name == ".." || 385 strings.Contains(name, "/") || strings.Contains(name, "\\") { 386 return fmt.Errorf("Repository name contains invalid path characters") 387 } 388 389 // check for sequences that could be used for traversal when normalized 390 if strings.Contains(name, "./") || strings.Contains(name, "../") || 391 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 392 return fmt.Errorf("Repository name contains invalid path sequence") 393 } 394 395 // then continue with character validation 396 for _, char := range name { 397 if !((char >= 'a' && char <= 'z') || 398 (char >= 'A' && char <= 'Z') || 399 (char >= '0' && char <= '9') || 400 char == '-' || char == '_' || char == '.') { 401 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 402 } 403 } 404 405 // additional check to prevent multiple sequential dots 406 if strings.Contains(name, "..") { 407 return fmt.Errorf("Repository name cannot contain sequential dots") 408 } 409 410 // if all checks pass 411 return nil 412} 413 414func stripGitExt(name string) string { 415 return strings.TrimSuffix(name, ".git") 416} 417 418func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 419 switch r.Method { 420 case http.MethodGet: 421 user := s.oauth.GetUser(r) 422 knots, err := s.enforcer.GetKnotsForUser(user.Did) 423 if err != nil { 424 s.pages.Notice(w, "repo", "Invalid user account.") 425 return 426 } 427 428 s.pages.NewRepo(w, pages.NewRepoParams{ 429 LoggedInUser: user, 430 Knots: knots, 431 }) 432 433 case http.MethodPost: 434 l := s.logger.With("handler", "NewRepo") 435 436 user := s.oauth.GetUser(r) 437 l = l.With("did", user.Did) 438 439 // form validation 440 domain := r.FormValue("domain") 441 if domain == "" { 442 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 443 return 444 } 445 l = l.With("knot", domain) 446 447 repoName := r.FormValue("name") 448 if repoName == "" { 449 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 450 return 451 } 452 453 if err := validateRepoName(repoName); err != nil { 454 s.pages.Notice(w, "repo", err.Error()) 455 return 456 } 457 repoName = stripGitExt(repoName) 458 l = l.With("repoName", repoName) 459 460 defaultBranch := r.FormValue("branch") 461 if defaultBranch == "" { 462 defaultBranch = "main" 463 } 464 l = l.With("defaultBranch", defaultBranch) 465 466 description := r.FormValue("description") 467 468 // ACL validation 469 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 470 if err != nil || !ok { 471 l.Info("unauthorized") 472 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 473 return 474 } 475 476 // Check for existing repos 477 existingRepo, err := db.GetRepo( 478 s.db, 479 db.FilterEq("did", user.Did), 480 db.FilterEq("name", repoName), 481 ) 482 if err == nil && existingRepo != nil { 483 l.Info("repo exists") 484 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 485 return 486 } 487 488 // create atproto record for this repo 489 rkey := tid.TID() 490 repo := &models.Repo{ 491 Did: user.Did, 492 Name: repoName, 493 Knot: domain, 494 Rkey: rkey, 495 Description: description, 496 Created: time.Now(), 497 Labels: models.DefaultLabelDefs(), 498 } 499 record := repo.AsRecord() 500 501 atpClient, err := s.oauth.AuthorizedClient(r) 502 if err != nil { 503 l.Info("PDS write failed", "err", err) 504 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 505 return 506 } 507 508 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 509 Collection: tangled.RepoNSID, 510 Repo: user.Did, 511 Rkey: rkey, 512 Record: &lexutil.LexiconTypeDecoder{ 513 Val: &record, 514 }, 515 }) 516 if err != nil { 517 l.Info("PDS write failed", "err", err) 518 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 519 return 520 } 521 522 aturi := atresp.Uri 523 l = l.With("aturi", aturi) 524 l.Info("wrote to PDS") 525 526 tx, err := s.db.BeginTx(r.Context(), nil) 527 if err != nil { 528 l.Info("txn failed", "err", err) 529 s.pages.Notice(w, "repo", "Failed to save repository information.") 530 return 531 } 532 533 // The rollback function reverts a few things on failure: 534 // - the pending txn 535 // - the ACLs 536 // - the atproto record created 537 rollback := func() { 538 err1 := tx.Rollback() 539 err2 := s.enforcer.E.LoadPolicy() 540 err3 := rollbackRecord(context.Background(), aturi, atpClient) 541 542 // ignore txn complete errors, this is okay 543 if errors.Is(err1, sql.ErrTxDone) { 544 err1 = nil 545 } 546 547 if errs := errors.Join(err1, err2, err3); errs != nil { 548 l.Error("failed to rollback changes", "errs", errs) 549 return 550 } 551 } 552 defer rollback() 553 554 client, err := s.oauth.ServiceClient( 555 r, 556 oauth.WithService(domain), 557 oauth.WithLxm(tangled.RepoCreateNSID), 558 oauth.WithDev(s.config.Core.Dev), 559 ) 560 if err != nil { 561 l.Error("service auth failed", "err", err) 562 s.pages.Notice(w, "repo", "Failed to reach PDS.") 563 return 564 } 565 566 xe := tangled.RepoCreate( 567 r.Context(), 568 client, 569 &tangled.RepoCreate_Input{ 570 Rkey: rkey, 571 }, 572 ) 573 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 574 l.Error("xrpc error", "xe", xe) 575 s.pages.Notice(w, "repo", err.Error()) 576 return 577 } 578 579 err = db.AddRepo(tx, repo) 580 if err != nil { 581 l.Error("db write failed", "err", err) 582 s.pages.Notice(w, "repo", "Failed to save repository information.") 583 return 584 } 585 586 // acls 587 p, _ := securejoin.SecureJoin(user.Did, repoName) 588 err = s.enforcer.AddRepo(user.Did, domain, p) 589 if err != nil { 590 l.Error("acl setup failed", "err", err) 591 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 592 return 593 } 594 595 err = tx.Commit() 596 if err != nil { 597 l.Error("txn commit failed", "err", err) 598 http.Error(w, err.Error(), http.StatusInternalServerError) 599 return 600 } 601 602 err = s.enforcer.E.SavePolicy() 603 if err != nil { 604 l.Error("acl save failed", "err", err) 605 http.Error(w, err.Error(), http.StatusInternalServerError) 606 return 607 } 608 609 // reset the ATURI because the transaction completed successfully 610 aturi = "" 611 612 s.notifier.NewRepo(r.Context(), repo) 613 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Did, repoName)) 614 } 615} 616 617// this is used to rollback changes made to the PDS 618// 619// it is a no-op if the provided ATURI is empty 620func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 621 if aturi == "" { 622 return nil 623 } 624 625 parsed := syntax.ATURI(aturi) 626 627 collection := parsed.Collection().String() 628 repo := parsed.Authority().String() 629 rkey := parsed.RecordKey().String() 630 631 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 632 Collection: collection, 633 Repo: repo, 634 Rkey: rkey, 635 }) 636 return err 637} 638 639func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver) error { 640 defaults := models.DefaultLabelDefs() 641 defaultLabels, err := db.GetLabelDefinitions(e, db.FilterIn("at_uri", defaults)) 642 if err != nil { 643 return err 644 } 645 // already present 646 if len(defaultLabels) == len(defaults) { 647 return nil 648 } 649 650 labelDefs, err := models.FetchDefaultDefs(r) 651 if err != nil { 652 return err 653 } 654 655 // Insert each label definition to the database 656 for _, labelDef := range labelDefs { 657 _, err = db.AddLabelDefinition(e, &labelDef) 658 if err != nil { 659 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 660 } 661 } 662 663 return nil 664}