forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log" 9 "log/slog" 10 "net/http" 11 "strings" 12 "time" 13 14 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 lexutil "github.com/bluesky-social/indigo/lex/util" 17 securejoin "github.com/cyphar/filepath-securejoin" 18 "github.com/go-chi/chi/v5" 19 "github.com/posthog/posthog-go" 20 "tangled.org/core/api/tangled" 21 "tangled.org/core/appview" 22 "tangled.org/core/appview/cache" 23 "tangled.org/core/appview/cache/session" 24 "tangled.org/core/appview/config" 25 "tangled.org/core/appview/db" 26 "tangled.org/core/appview/models" 27 "tangled.org/core/appview/notify" 28 "tangled.org/core/appview/oauth" 29 "tangled.org/core/appview/pages" 30 posthogService "tangled.org/core/appview/posthog" 31 "tangled.org/core/appview/reporesolver" 32 "tangled.org/core/appview/validator" 33 xrpcclient "tangled.org/core/appview/xrpcclient" 34 "tangled.org/core/eventconsumer" 35 "tangled.org/core/idresolver" 36 "tangled.org/core/jetstream" 37 tlog "tangled.org/core/log" 38 "tangled.org/core/rbac" 39 "tangled.org/core/tid" 40) 41 42type State struct { 43 db *db.DB 44 notifier notify.Notifier 45 oauth *oauth.OAuth 46 enforcer *rbac.Enforcer 47 pages *pages.Pages 48 sess *session.SessionStore 49 idResolver *idresolver.Resolver 50 posthog posthog.Client 51 jc *jetstream.JetstreamClient 52 config *config.Config 53 repoResolver *reporesolver.RepoResolver 54 knotstream *eventconsumer.Consumer 55 spindlestream *eventconsumer.Consumer 56 logger *slog.Logger 57 validator *validator.Validator 58} 59 60func Make(ctx context.Context, config *config.Config) (*State, error) { 61 d, err := db.Make(config.Core.DbPath) 62 if err != nil { 63 return nil, fmt.Errorf("failed to create db: %w", err) 64 } 65 66 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 67 if err != nil { 68 return nil, fmt.Errorf("failed to create enforcer: %w", err) 69 } 70 71 res, err := idresolver.RedisResolver(config.Redis.ToURL()) 72 if err != nil { 73 log.Printf("failed to create redis resolver: %v", err) 74 res = idresolver.DefaultResolver() 75 } 76 77 pgs := pages.NewPages(config, res) 78 cache := cache.New(config.Redis.Addr) 79 sess := session.New(cache) 80 oauth := oauth.NewOAuth(config, sess) 81 validator := validator.New(d, res) 82 83 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 84 if err != nil { 85 return nil, fmt.Errorf("failed to create posthog client: %w", err) 86 } 87 88 repoResolver := reporesolver.New(config, enforcer, res, d) 89 90 wrapper := db.DbWrapper{d} 91 jc, err := jetstream.NewJetstreamClient( 92 config.Jetstream.Endpoint, 93 "appview", 94 []string{ 95 tangled.GraphFollowNSID, 96 tangled.FeedStarNSID, 97 tangled.PublicKeyNSID, 98 tangled.RepoArtifactNSID, 99 tangled.ActorProfileNSID, 100 tangled.SpindleMemberNSID, 101 tangled.SpindleNSID, 102 tangled.StringNSID, 103 tangled.RepoIssueNSID, 104 tangled.RepoIssueCommentNSID, 105 tangled.LabelDefinitionNSID, 106 }, 107 nil, 108 slog.Default(), 109 wrapper, 110 false, 111 112 // in-memory filter is inapplicalble to appview so 113 // we'll never log dids anyway. 114 false, 115 ) 116 if err != nil { 117 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 118 } 119 120 ingester := appview.Ingester{ 121 Db: wrapper, 122 Enforcer: enforcer, 123 IdResolver: res, 124 Config: config, 125 Logger: tlog.New("ingester"), 126 Validator: validator, 127 } 128 err = jc.StartJetstream(ctx, ingester.Ingest()) 129 if err != nil { 130 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 131 } 132 133 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 134 if err != nil { 135 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 136 } 137 knotstream.Start(ctx) 138 139 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 140 if err != nil { 141 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 142 } 143 spindlestream.Start(ctx) 144 145 var notifiers []notify.Notifier 146 if !config.Core.Dev { 147 notifiers = append(notifiers, posthogService.NewPosthogNotifier(posthog)) 148 } 149 notifier := notify.NewMergedNotifier(notifiers...) 150 151 state := &State{ 152 d, 153 notifier, 154 oauth, 155 enforcer, 156 pgs, 157 sess, 158 res, 159 posthog, 160 jc, 161 config, 162 repoResolver, 163 knotstream, 164 spindlestream, 165 slog.Default(), 166 validator, 167 } 168 169 return state, nil 170} 171 172func (s *State) Close() error { 173 // other close up logic goes here 174 return s.db.Close() 175} 176 177func (s *State) Favicon(w http.ResponseWriter, r *http.Request) { 178 w.Header().Set("Content-Type", "image/svg+xml") 179 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year 180 w.Header().Set("ETag", `"favicon-svg-v1"`) 181 182 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` { 183 w.WriteHeader(http.StatusNotModified) 184 return 185 } 186 187 s.pages.Favicon(w) 188} 189 190func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 191 user := s.oauth.GetUser(r) 192 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 193 LoggedInUser: user, 194 }) 195} 196 197func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 198 user := s.oauth.GetUser(r) 199 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 200 LoggedInUser: user, 201 }) 202} 203 204func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) { 205 if s.oauth.GetUser(r) != nil { 206 s.Timeline(w, r) 207 return 208 } 209 s.Home(w, r) 210} 211 212func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 213 user := s.oauth.GetUser(r) 214 215 var userDid string 216 if user != nil { 217 userDid = user.Did 218 } 219 timeline, err := db.MakeTimeline(s.db, 50, userDid) 220 if err != nil { 221 log.Println(err) 222 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 223 } 224 225 repos, err := db.GetTopStarredReposLastWeek(s.db) 226 if err != nil { 227 log.Println(err) 228 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 229 return 230 } 231 232 s.pages.Timeline(w, pages.TimelineParams{ 233 LoggedInUser: user, 234 Timeline: timeline, 235 Repos: repos, 236 }) 237} 238 239func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 240 user := s.oauth.GetUser(r) 241 l := s.logger.With("handler", "UpgradeBanner") 242 l = l.With("did", user.Did) 243 l = l.With("handle", user.Handle) 244 245 regs, err := db.GetRegistrations( 246 s.db, 247 db.FilterEq("did", user.Did), 248 db.FilterEq("needs_upgrade", 1), 249 ) 250 if err != nil { 251 l.Error("non-fatal: failed to get registrations", "err", err) 252 } 253 254 spindles, err := db.GetSpindles( 255 s.db, 256 db.FilterEq("owner", user.Did), 257 db.FilterEq("needs_upgrade", 1), 258 ) 259 if err != nil { 260 l.Error("non-fatal: failed to get spindles", "err", err) 261 } 262 263 if regs == nil && spindles == nil { 264 return 265 } 266 267 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 268 Registrations: regs, 269 Spindles: spindles, 270 }) 271} 272 273func (s *State) Home(w http.ResponseWriter, r *http.Request) { 274 timeline, err := db.MakeTimeline(s.db, 5, "") 275 if err != nil { 276 log.Println(err) 277 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 278 return 279 } 280 281 repos, err := db.GetTopStarredReposLastWeek(s.db) 282 if err != nil { 283 log.Println(err) 284 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 285 return 286 } 287 288 s.pages.Home(w, pages.TimelineParams{ 289 LoggedInUser: nil, 290 Timeline: timeline, 291 Repos: repos, 292 }) 293} 294 295func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 296 user := chi.URLParam(r, "user") 297 user = strings.TrimPrefix(user, "@") 298 299 if user == "" { 300 w.WriteHeader(http.StatusBadRequest) 301 return 302 } 303 304 id, err := s.idResolver.ResolveIdent(r.Context(), user) 305 if err != nil { 306 w.WriteHeader(http.StatusInternalServerError) 307 return 308 } 309 310 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 311 if err != nil { 312 w.WriteHeader(http.StatusNotFound) 313 return 314 } 315 316 if len(pubKeys) == 0 { 317 w.WriteHeader(http.StatusNotFound) 318 return 319 } 320 321 for _, k := range pubKeys { 322 key := strings.TrimRight(k.Key, "\n") 323 fmt.Fprintln(w, key) 324 } 325} 326 327func validateRepoName(name string) error { 328 // check for path traversal attempts 329 if name == "." || name == ".." || 330 strings.Contains(name, "/") || strings.Contains(name, "\\") { 331 return fmt.Errorf("Repository name contains invalid path characters") 332 } 333 334 // check for sequences that could be used for traversal when normalized 335 if strings.Contains(name, "./") || strings.Contains(name, "../") || 336 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 337 return fmt.Errorf("Repository name contains invalid path sequence") 338 } 339 340 // then continue with character validation 341 for _, char := range name { 342 if !((char >= 'a' && char <= 'z') || 343 (char >= 'A' && char <= 'Z') || 344 (char >= '0' && char <= '9') || 345 char == '-' || char == '_' || char == '.') { 346 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 347 } 348 } 349 350 // additional check to prevent multiple sequential dots 351 if strings.Contains(name, "..") { 352 return fmt.Errorf("Repository name cannot contain sequential dots") 353 } 354 355 // if all checks pass 356 return nil 357} 358 359func stripGitExt(name string) string { 360 return strings.TrimSuffix(name, ".git") 361} 362 363func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 364 switch r.Method { 365 case http.MethodGet: 366 user := s.oauth.GetUser(r) 367 knots, err := s.enforcer.GetKnotsForUser(user.Did) 368 if err != nil { 369 s.pages.Notice(w, "repo", "Invalid user account.") 370 return 371 } 372 373 s.pages.NewRepo(w, pages.NewRepoParams{ 374 LoggedInUser: user, 375 Knots: knots, 376 }) 377 378 case http.MethodPost: 379 l := s.logger.With("handler", "NewRepo") 380 381 user := s.oauth.GetUser(r) 382 l = l.With("did", user.Did) 383 l = l.With("handle", user.Handle) 384 385 // form validation 386 domain := r.FormValue("domain") 387 if domain == "" { 388 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 389 return 390 } 391 l = l.With("knot", domain) 392 393 repoName := r.FormValue("name") 394 if repoName == "" { 395 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 396 return 397 } 398 399 if err := validateRepoName(repoName); err != nil { 400 s.pages.Notice(w, "repo", err.Error()) 401 return 402 } 403 repoName = stripGitExt(repoName) 404 l = l.With("repoName", repoName) 405 406 defaultBranch := r.FormValue("branch") 407 if defaultBranch == "" { 408 defaultBranch = "main" 409 } 410 l = l.With("defaultBranch", defaultBranch) 411 412 description := r.FormValue("description") 413 414 // ACL validation 415 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 416 if err != nil || !ok { 417 l.Info("unauthorized") 418 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 419 return 420 } 421 422 // Check for existing repos 423 existingRepo, err := db.GetRepo( 424 s.db, 425 db.FilterEq("did", user.Did), 426 db.FilterEq("name", repoName), 427 ) 428 if err == nil && existingRepo != nil { 429 l.Info("repo exists") 430 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 431 return 432 } 433 434 // create atproto record for this repo 435 rkey := tid.TID() 436 repo := &models.Repo{ 437 Did: user.Did, 438 Name: repoName, 439 Knot: domain, 440 Rkey: rkey, 441 Description: description, 442 Created: time.Now(), 443 } 444 record := repo.AsRecord() 445 446 xrpcClient, err := s.oauth.AuthorizedClient(r) 447 if err != nil { 448 l.Info("PDS write failed", "err", err) 449 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 450 return 451 } 452 453 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{ 454 Collection: tangled.RepoNSID, 455 Repo: user.Did, 456 Rkey: rkey, 457 Record: &lexutil.LexiconTypeDecoder{ 458 Val: &record, 459 }, 460 }) 461 if err != nil { 462 l.Info("PDS write failed", "err", err) 463 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 464 return 465 } 466 467 aturi := atresp.Uri 468 l = l.With("aturi", aturi) 469 l.Info("wrote to PDS") 470 471 tx, err := s.db.BeginTx(r.Context(), nil) 472 if err != nil { 473 l.Info("txn failed", "err", err) 474 s.pages.Notice(w, "repo", "Failed to save repository information.") 475 return 476 } 477 478 // The rollback function reverts a few things on failure: 479 // - the pending txn 480 // - the ACLs 481 // - the atproto record created 482 rollback := func() { 483 err1 := tx.Rollback() 484 err2 := s.enforcer.E.LoadPolicy() 485 err3 := rollbackRecord(context.Background(), aturi, xrpcClient) 486 487 // ignore txn complete errors, this is okay 488 if errors.Is(err1, sql.ErrTxDone) { 489 err1 = nil 490 } 491 492 if errs := errors.Join(err1, err2, err3); errs != nil { 493 l.Error("failed to rollback changes", "errs", errs) 494 return 495 } 496 } 497 defer rollback() 498 499 client, err := s.oauth.ServiceClient( 500 r, 501 oauth.WithService(domain), 502 oauth.WithLxm(tangled.RepoCreateNSID), 503 oauth.WithDev(s.config.Core.Dev), 504 ) 505 if err != nil { 506 l.Error("service auth failed", "err", err) 507 s.pages.Notice(w, "repo", "Failed to reach PDS.") 508 return 509 } 510 511 xe := tangled.RepoCreate( 512 r.Context(), 513 client, 514 &tangled.RepoCreate_Input{ 515 Rkey: rkey, 516 }, 517 ) 518 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 519 l.Error("xrpc error", "xe", xe) 520 s.pages.Notice(w, "repo", err.Error()) 521 return 522 } 523 524 err = db.AddRepo(tx, repo) 525 if err != nil { 526 l.Error("db write failed", "err", err) 527 s.pages.Notice(w, "repo", "Failed to save repository information.") 528 return 529 } 530 531 // acls 532 p, _ := securejoin.SecureJoin(user.Did, repoName) 533 err = s.enforcer.AddRepo(user.Did, domain, p) 534 if err != nil { 535 l.Error("acl setup failed", "err", err) 536 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 537 return 538 } 539 540 err = tx.Commit() 541 if err != nil { 542 l.Error("txn commit failed", "err", err) 543 http.Error(w, err.Error(), http.StatusInternalServerError) 544 return 545 } 546 547 err = s.enforcer.E.SavePolicy() 548 if err != nil { 549 l.Error("acl save failed", "err", err) 550 http.Error(w, err.Error(), http.StatusInternalServerError) 551 return 552 } 553 554 // reset the ATURI because the transaction completed successfully 555 aturi = "" 556 557 s.notifier.NewRepo(r.Context(), repo) 558 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName)) 559 } 560} 561 562// this is used to rollback changes made to the PDS 563// 564// it is a no-op if the provided ATURI is empty 565func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error { 566 if aturi == "" { 567 return nil 568 } 569 570 parsed := syntax.ATURI(aturi) 571 572 collection := parsed.Collection().String() 573 repo := parsed.Authority().String() 574 rkey := parsed.RecordKey().String() 575 576 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{ 577 Collection: collection, 578 Repo: repo, 579 Rkey: rkey, 580 }) 581 return err 582}