forked from tangled.org/core
this repo has no description
at master 16 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log" 9 "log/slog" 10 "net/http" 11 "strings" 12 "time" 13 14 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 lexutil "github.com/bluesky-social/indigo/lex/util" 17 securejoin "github.com/cyphar/filepath-securejoin" 18 "github.com/go-chi/chi/v5" 19 "github.com/posthog/posthog-go" 20 "tangled.org/core/api/tangled" 21 "tangled.org/core/appview" 22 "tangled.org/core/appview/cache" 23 "tangled.org/core/appview/cache/session" 24 "tangled.org/core/appview/config" 25 "tangled.org/core/appview/db" 26 "tangled.org/core/appview/models" 27 "tangled.org/core/appview/notify" 28 dbnotify "tangled.org/core/appview/notify/db" 29 phnotify "tangled.org/core/appview/notify/posthog" 30 "tangled.org/core/appview/oauth" 31 "tangled.org/core/appview/pages" 32 "tangled.org/core/appview/reporesolver" 33 "tangled.org/core/appview/validator" 34 xrpcclient "tangled.org/core/appview/xrpcclient" 35 "tangled.org/core/eventconsumer" 36 "tangled.org/core/idresolver" 37 "tangled.org/core/jetstream" 38 tlog "tangled.org/core/log" 39 "tangled.org/core/rbac" 40 "tangled.org/core/tid" 41) 42 43type State struct { 44 db *db.DB 45 notifier notify.Notifier 46 oauth *oauth.OAuth 47 enforcer *rbac.Enforcer 48 pages *pages.Pages 49 sess *session.SessionStore 50 idResolver *idresolver.Resolver 51 posthog posthog.Client 52 jc *jetstream.JetstreamClient 53 config *config.Config 54 repoResolver *reporesolver.RepoResolver 55 knotstream *eventconsumer.Consumer 56 spindlestream *eventconsumer.Consumer 57 logger *slog.Logger 58 validator *validator.Validator 59} 60 61func Make(ctx context.Context, config *config.Config) (*State, error) { 62 d, err := db.Make(config.Core.DbPath) 63 if err != nil { 64 return nil, fmt.Errorf("failed to create db: %w", err) 65 } 66 67 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 68 if err != nil { 69 return nil, fmt.Errorf("failed to create enforcer: %w", err) 70 } 71 72 res, err := idresolver.RedisResolver(config.Redis.ToURL()) 73 if err != nil { 74 log.Printf("failed to create redis resolver: %v", err) 75 res = idresolver.DefaultResolver() 76 } 77 78 pgs := pages.NewPages(config, res) 79 cache := cache.New(config.Redis.Addr) 80 sess := session.New(cache) 81 oauth := oauth.NewOAuth(config, sess) 82 validator := validator.New(d, res, enforcer) 83 84 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 85 if err != nil { 86 return nil, fmt.Errorf("failed to create posthog client: %w", err) 87 } 88 89 repoResolver := reporesolver.New(config, enforcer, res, d) 90 91 wrapper := db.DbWrapper{Execer: d} 92 jc, err := jetstream.NewJetstreamClient( 93 config.Jetstream.Endpoint, 94 "appview", 95 []string{ 96 tangled.GraphFollowNSID, 97 tangled.FeedStarNSID, 98 tangled.PublicKeyNSID, 99 tangled.RepoArtifactNSID, 100 tangled.ActorProfileNSID, 101 tangled.SpindleMemberNSID, 102 tangled.SpindleNSID, 103 tangled.StringNSID, 104 tangled.RepoIssueNSID, 105 tangled.RepoIssueCommentNSID, 106 tangled.LabelDefinitionNSID, 107 tangled.LabelOpNSID, 108 }, 109 nil, 110 slog.Default(), 111 wrapper, 112 false, 113 114 // in-memory filter is inapplicalble to appview so 115 // we'll never log dids anyway. 116 false, 117 ) 118 if err != nil { 119 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 120 } 121 122 if err := BackfillDefaultDefs(d, res); err != nil { 123 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 124 } 125 126 ingester := appview.Ingester{ 127 Db: wrapper, 128 Enforcer: enforcer, 129 IdResolver: res, 130 Config: config, 131 Logger: tlog.New("ingester"), 132 Validator: validator, 133 } 134 err = jc.StartJetstream(ctx, ingester.Ingest()) 135 if err != nil { 136 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 137 } 138 139 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 140 if err != nil { 141 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 142 } 143 knotstream.Start(ctx) 144 145 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 146 if err != nil { 147 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 148 } 149 spindlestream.Start(ctx) 150 151 var notifiers []notify.Notifier 152 153 // Always add the database notifier 154 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 155 156 // Add other notifiers in production only 157 if !config.Core.Dev { 158 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 159 } 160 notifier := notify.NewMergedNotifier(notifiers...) 161 162 state := &State{ 163 d, 164 notifier, 165 oauth, 166 enforcer, 167 pgs, 168 sess, 169 res, 170 posthog, 171 jc, 172 config, 173 repoResolver, 174 knotstream, 175 spindlestream, 176 slog.Default(), 177 validator, 178 } 179 180 return state, nil 181} 182 183func (s *State) Close() error { 184 // other close up logic goes here 185 return s.db.Close() 186} 187 188func (s *State) Favicon(w http.ResponseWriter, r *http.Request) { 189 w.Header().Set("Content-Type", "image/svg+xml") 190 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year 191 w.Header().Set("ETag", `"favicon-svg-v1"`) 192 193 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` { 194 w.WriteHeader(http.StatusNotModified) 195 return 196 } 197 198 s.pages.Favicon(w) 199} 200 201// https://developer.mozilla.org/en-US/docs/Web/Progressive_web_apps/Manifest 202const manifestJson = `{ 203 "name": "tangled", 204 "description": "tightly-knit social coding.", 205 "icons": [ 206 { 207 "src": "/favicon.svg", 208 "sizes": "144x144" 209 } 210 ], 211 "start_url": "/", 212 "id": "org.tangled", 213 214 "display": "standalone", 215 "background_color": "#111827", 216 "theme_color": "#111827" 217}` 218 219func (p *State) PWAManifest(w http.ResponseWriter, r *http.Request) { 220 w.Header().Set("Content-Type", "application/json") 221 w.Write([]byte(manifestJson)) 222} 223 224func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 225 user := s.oauth.GetUser(r) 226 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 227 LoggedInUser: user, 228 }) 229} 230 231func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 232 user := s.oauth.GetUser(r) 233 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 234 LoggedInUser: user, 235 }) 236} 237 238func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 239 user := s.oauth.GetUser(r) 240 s.pages.Brand(w, pages.BrandParams{ 241 LoggedInUser: user, 242 }) 243} 244 245func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) { 246 if s.oauth.GetUser(r) != nil { 247 s.Timeline(w, r) 248 return 249 } 250 s.Home(w, r) 251} 252 253func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 254 user := s.oauth.GetUser(r) 255 256 var userDid string 257 if user != nil { 258 userDid = user.Did 259 } 260 timeline, err := db.MakeTimeline(s.db, 50, userDid) 261 if err != nil { 262 log.Println(err) 263 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 264 } 265 266 repos, err := db.GetTopStarredReposLastWeek(s.db) 267 if err != nil { 268 log.Println(err) 269 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 270 return 271 } 272 273 s.pages.Timeline(w, pages.TimelineParams{ 274 LoggedInUser: user, 275 Timeline: timeline, 276 Repos: repos, 277 }) 278} 279 280func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 281 user := s.oauth.GetUser(r) 282 if user == nil { 283 return 284 } 285 286 l := s.logger.With("handler", "UpgradeBanner") 287 l = l.With("did", user.Did) 288 l = l.With("handle", user.Handle) 289 290 regs, err := db.GetRegistrations( 291 s.db, 292 db.FilterEq("did", user.Did), 293 db.FilterEq("needs_upgrade", 1), 294 ) 295 if err != nil { 296 l.Error("non-fatal: failed to get registrations", "err", err) 297 } 298 299 spindles, err := db.GetSpindles( 300 s.db, 301 db.FilterEq("owner", user.Did), 302 db.FilterEq("needs_upgrade", 1), 303 ) 304 if err != nil { 305 l.Error("non-fatal: failed to get spindles", "err", err) 306 } 307 308 if regs == nil && spindles == nil { 309 return 310 } 311 312 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 313 Registrations: regs, 314 Spindles: spindles, 315 }) 316} 317 318func (s *State) Home(w http.ResponseWriter, r *http.Request) { 319 timeline, err := db.MakeTimeline(s.db, 5, "") 320 if err != nil { 321 log.Println(err) 322 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 323 return 324 } 325 326 repos, err := db.GetTopStarredReposLastWeek(s.db) 327 if err != nil { 328 log.Println(err) 329 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 330 return 331 } 332 333 s.pages.Home(w, pages.TimelineParams{ 334 LoggedInUser: nil, 335 Timeline: timeline, 336 Repos: repos, 337 }) 338} 339 340func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 341 user := chi.URLParam(r, "user") 342 user = strings.TrimPrefix(user, "@") 343 344 if user == "" { 345 w.WriteHeader(http.StatusBadRequest) 346 return 347 } 348 349 id, err := s.idResolver.ResolveIdent(r.Context(), user) 350 if err != nil { 351 w.WriteHeader(http.StatusInternalServerError) 352 return 353 } 354 355 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 356 if err != nil { 357 w.WriteHeader(http.StatusNotFound) 358 return 359 } 360 361 if len(pubKeys) == 0 { 362 w.WriteHeader(http.StatusNotFound) 363 return 364 } 365 366 for _, k := range pubKeys { 367 key := strings.TrimRight(k.Key, "\n") 368 fmt.Fprintln(w, key) 369 } 370} 371 372func validateRepoName(name string) error { 373 // check for path traversal attempts 374 if name == "." || name == ".." || 375 strings.Contains(name, "/") || strings.Contains(name, "\\") { 376 return fmt.Errorf("Repository name contains invalid path characters") 377 } 378 379 // check for sequences that could be used for traversal when normalized 380 if strings.Contains(name, "./") || strings.Contains(name, "../") || 381 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 382 return fmt.Errorf("Repository name contains invalid path sequence") 383 } 384 385 // then continue with character validation 386 for _, char := range name { 387 if !((char >= 'a' && char <= 'z') || 388 (char >= 'A' && char <= 'Z') || 389 (char >= '0' && char <= '9') || 390 char == '-' || char == '_' || char == '.') { 391 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 392 } 393 } 394 395 // additional check to prevent multiple sequential dots 396 if strings.Contains(name, "..") { 397 return fmt.Errorf("Repository name cannot contain sequential dots") 398 } 399 400 // if all checks pass 401 return nil 402} 403 404func stripGitExt(name string) string { 405 return strings.TrimSuffix(name, ".git") 406} 407 408func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 409 switch r.Method { 410 case http.MethodGet: 411 user := s.oauth.GetUser(r) 412 knots, err := s.enforcer.GetKnotsForUser(user.Did) 413 if err != nil { 414 s.pages.Notice(w, "repo", "Invalid user account.") 415 return 416 } 417 418 s.pages.NewRepo(w, pages.NewRepoParams{ 419 LoggedInUser: user, 420 Knots: knots, 421 }) 422 423 case http.MethodPost: 424 l := s.logger.With("handler", "NewRepo") 425 426 user := s.oauth.GetUser(r) 427 l = l.With("did", user.Did) 428 l = l.With("handle", user.Handle) 429 430 // form validation 431 domain := r.FormValue("domain") 432 if domain == "" { 433 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 434 return 435 } 436 l = l.With("knot", domain) 437 438 repoName := r.FormValue("name") 439 if repoName == "" { 440 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 441 return 442 } 443 444 if err := validateRepoName(repoName); err != nil { 445 s.pages.Notice(w, "repo", err.Error()) 446 return 447 } 448 repoName = stripGitExt(repoName) 449 l = l.With("repoName", repoName) 450 451 defaultBranch := r.FormValue("branch") 452 if defaultBranch == "" { 453 defaultBranch = "main" 454 } 455 l = l.With("defaultBranch", defaultBranch) 456 457 description := r.FormValue("description") 458 459 // ACL validation 460 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 461 if err != nil || !ok { 462 l.Info("unauthorized") 463 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 464 return 465 } 466 467 // Check for existing repos 468 existingRepo, err := db.GetRepo( 469 s.db, 470 db.FilterEq("did", user.Did), 471 db.FilterEq("name", repoName), 472 ) 473 if err == nil && existingRepo != nil { 474 l.Info("repo exists") 475 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 476 return 477 } 478 479 // create atproto record for this repo 480 rkey := tid.TID() 481 repo := &models.Repo{ 482 Did: user.Did, 483 Name: repoName, 484 Knot: domain, 485 Rkey: rkey, 486 Description: description, 487 Created: time.Now(), 488 Labels: models.DefaultLabelDefs(), 489 } 490 record := repo.AsRecord() 491 492 xrpcClient, err := s.oauth.AuthorizedClient(r) 493 if err != nil { 494 l.Info("PDS write failed", "err", err) 495 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 496 return 497 } 498 499 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{ 500 Collection: tangled.RepoNSID, 501 Repo: user.Did, 502 Rkey: rkey, 503 Record: &lexutil.LexiconTypeDecoder{ 504 Val: &record, 505 }, 506 }) 507 if err != nil { 508 l.Info("PDS write failed", "err", err) 509 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 510 return 511 } 512 513 aturi := atresp.Uri 514 l = l.With("aturi", aturi) 515 l.Info("wrote to PDS") 516 517 tx, err := s.db.BeginTx(r.Context(), nil) 518 if err != nil { 519 l.Info("txn failed", "err", err) 520 s.pages.Notice(w, "repo", "Failed to save repository information.") 521 return 522 } 523 524 // The rollback function reverts a few things on failure: 525 // - the pending txn 526 // - the ACLs 527 // - the atproto record created 528 rollback := func() { 529 err1 := tx.Rollback() 530 err2 := s.enforcer.E.LoadPolicy() 531 err3 := rollbackRecord(context.Background(), aturi, xrpcClient) 532 533 // ignore txn complete errors, this is okay 534 if errors.Is(err1, sql.ErrTxDone) { 535 err1 = nil 536 } 537 538 if errs := errors.Join(err1, err2, err3); errs != nil { 539 l.Error("failed to rollback changes", "errs", errs) 540 return 541 } 542 } 543 defer rollback() 544 545 client, err := s.oauth.ServiceClient( 546 r, 547 oauth.WithService(domain), 548 oauth.WithLxm(tangled.RepoCreateNSID), 549 oauth.WithDev(s.config.Core.Dev), 550 ) 551 if err != nil { 552 l.Error("service auth failed", "err", err) 553 s.pages.Notice(w, "repo", "Failed to reach PDS.") 554 return 555 } 556 557 xe := tangled.RepoCreate( 558 r.Context(), 559 client, 560 &tangled.RepoCreate_Input{ 561 Rkey: rkey, 562 }, 563 ) 564 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 565 l.Error("xrpc error", "xe", xe) 566 s.pages.Notice(w, "repo", err.Error()) 567 return 568 } 569 570 err = db.AddRepo(tx, repo) 571 if err != nil { 572 l.Error("db write failed", "err", err) 573 s.pages.Notice(w, "repo", "Failed to save repository information.") 574 return 575 } 576 577 // acls 578 p, _ := securejoin.SecureJoin(user.Did, repoName) 579 err = s.enforcer.AddRepo(user.Did, domain, p) 580 if err != nil { 581 l.Error("acl setup failed", "err", err) 582 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 583 return 584 } 585 586 err = tx.Commit() 587 if err != nil { 588 l.Error("txn commit failed", "err", err) 589 http.Error(w, err.Error(), http.StatusInternalServerError) 590 return 591 } 592 593 err = s.enforcer.E.SavePolicy() 594 if err != nil { 595 l.Error("acl save failed", "err", err) 596 http.Error(w, err.Error(), http.StatusInternalServerError) 597 return 598 } 599 600 // reset the ATURI because the transaction completed successfully 601 aturi = "" 602 603 s.notifier.NewRepo(r.Context(), repo) 604 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName)) 605 } 606} 607 608// this is used to rollback changes made to the PDS 609// 610// it is a no-op if the provided ATURI is empty 611func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error { 612 if aturi == "" { 613 return nil 614 } 615 616 parsed := syntax.ATURI(aturi) 617 618 collection := parsed.Collection().String() 619 repo := parsed.Authority().String() 620 rkey := parsed.RecordKey().String() 621 622 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{ 623 Collection: collection, 624 Repo: repo, 625 Rkey: rkey, 626 }) 627 return err 628} 629 630func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver) error { 631 defaults := models.DefaultLabelDefs() 632 defaultLabels, err := db.GetLabelDefinitions(e, db.FilterIn("at_uri", defaults)) 633 if err != nil { 634 return err 635 } 636 // already present 637 if len(defaultLabels) == len(defaults) { 638 return nil 639 } 640 641 labelDefs, err := models.FetchDefaultDefs(r) 642 if err != nil { 643 return err 644 } 645 646 // Insert each label definition to the database 647 for _, labelDef := range labelDefs { 648 _, err = db.AddLabelDefinition(e, &labelDef) 649 if err != nil { 650 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 651 } 652 } 653 654 return nil 655}