forked from tangled.org/core
this repo has no description
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log" 9 "log/slog" 10 "net/http" 11 "strings" 12 "time" 13 14 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 lexutil "github.com/bluesky-social/indigo/lex/util" 17 securejoin "github.com/cyphar/filepath-securejoin" 18 "github.com/go-chi/chi/v5" 19 "github.com/posthog/posthog-go" 20 "tangled.sh/tangled.sh/core/api/tangled" 21 "tangled.sh/tangled.sh/core/appview" 22 "tangled.sh/tangled.sh/core/appview/cache" 23 "tangled.sh/tangled.sh/core/appview/cache/session" 24 "tangled.sh/tangled.sh/core/appview/config" 25 "tangled.sh/tangled.sh/core/appview/db" 26 "tangled.sh/tangled.sh/core/appview/notify" 27 "tangled.sh/tangled.sh/core/appview/oauth" 28 "tangled.sh/tangled.sh/core/appview/pages" 29 posthogService "tangled.sh/tangled.sh/core/appview/posthog" 30 "tangled.sh/tangled.sh/core/appview/reporesolver" 31 xrpcclient "tangled.sh/tangled.sh/core/appview/xrpcclient" 32 "tangled.sh/tangled.sh/core/eventconsumer" 33 "tangled.sh/tangled.sh/core/idresolver" 34 "tangled.sh/tangled.sh/core/jetstream" 35 tlog "tangled.sh/tangled.sh/core/log" 36 "tangled.sh/tangled.sh/core/rbac" 37 "tangled.sh/tangled.sh/core/tid" 38 // xrpcerr "tangled.sh/tangled.sh/core/xrpc/errors" 39) 40 41type State struct { 42 db *db.DB 43 notifier notify.Notifier 44 oauth *oauth.OAuth 45 enforcer *rbac.Enforcer 46 pages *pages.Pages 47 sess *session.SessionStore 48 idResolver *idresolver.Resolver 49 posthog posthog.Client 50 jc *jetstream.JetstreamClient 51 config *config.Config 52 repoResolver *reporesolver.RepoResolver 53 knotstream *eventconsumer.Consumer 54 spindlestream *eventconsumer.Consumer 55 logger *slog.Logger 56} 57 58func Make(ctx context.Context, config *config.Config) (*State, error) { 59 d, err := db.Make(config.Core.DbPath) 60 if err != nil { 61 return nil, fmt.Errorf("failed to create db: %w", err) 62 } 63 64 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 65 if err != nil { 66 return nil, fmt.Errorf("failed to create enforcer: %w", err) 67 } 68 69 res, err := idresolver.RedisResolver(config.Redis.ToURL()) 70 if err != nil { 71 log.Printf("failed to create redis resolver: %v", err) 72 res = idresolver.DefaultResolver() 73 } 74 75 pgs := pages.NewPages(config, res) 76 77 cache := cache.New(config.Redis.Addr) 78 sess := session.New(cache) 79 80 oauth := oauth.NewOAuth(config, sess) 81 82 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 83 if err != nil { 84 return nil, fmt.Errorf("failed to create posthog client: %w", err) 85 } 86 87 repoResolver := reporesolver.New(config, enforcer, res, d) 88 89 wrapper := db.DbWrapper{d} 90 jc, err := jetstream.NewJetstreamClient( 91 config.Jetstream.Endpoint, 92 "appview", 93 []string{ 94 tangled.GraphFollowNSID, 95 tangled.FeedStarNSID, 96 tangled.PublicKeyNSID, 97 tangled.RepoArtifactNSID, 98 tangled.ActorProfileNSID, 99 tangled.SpindleMemberNSID, 100 tangled.SpindleNSID, 101 tangled.StringNSID, 102 tangled.RepoIssueNSID, 103 tangled.RepoIssueCommentNSID, 104 }, 105 nil, 106 slog.Default(), 107 wrapper, 108 false, 109 110 // in-memory filter is inapplicalble to appview so 111 // we'll never log dids anyway. 112 false, 113 ) 114 if err != nil { 115 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 116 } 117 118 ingester := appview.Ingester{ 119 Db: wrapper, 120 Enforcer: enforcer, 121 IdResolver: res, 122 Config: config, 123 Logger: tlog.New("ingester"), 124 } 125 err = jc.StartJetstream(ctx, ingester.Ingest()) 126 if err != nil { 127 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 128 } 129 130 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 131 if err != nil { 132 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 133 } 134 knotstream.Start(ctx) 135 136 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 137 if err != nil { 138 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 139 } 140 spindlestream.Start(ctx) 141 142 var notifiers []notify.Notifier 143 if !config.Core.Dev { 144 notifiers = append(notifiers, posthogService.NewPosthogNotifier(posthog)) 145 } 146 notifier := notify.NewMergedNotifier(notifiers...) 147 148 state := &State{ 149 d, 150 notifier, 151 oauth, 152 enforcer, 153 pgs, 154 sess, 155 res, 156 posthog, 157 jc, 158 config, 159 repoResolver, 160 knotstream, 161 spindlestream, 162 slog.Default(), 163 } 164 165 return state, nil 166} 167 168func (s *State) Close() error { 169 // other close up logic goes here 170 return s.db.Close() 171} 172 173func (s *State) Favicon(w http.ResponseWriter, r *http.Request) { 174 w.Header().Set("Content-Type", "image/svg+xml") 175 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year 176 w.Header().Set("ETag", `"favicon-svg-v1"`) 177 178 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` { 179 w.WriteHeader(http.StatusNotModified) 180 return 181 } 182 183 s.pages.Favicon(w) 184} 185 186func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 187 user := s.oauth.GetUser(r) 188 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 189 LoggedInUser: user, 190 }) 191} 192 193func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 194 user := s.oauth.GetUser(r) 195 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 196 LoggedInUser: user, 197 }) 198} 199 200func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) { 201 if s.oauth.GetUser(r) != nil { 202 s.Timeline(w, r) 203 return 204 } 205 s.Home(w, r) 206} 207 208func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 209 user := s.oauth.GetUser(r) 210 211 timeline, err := db.MakeTimeline(s.db, 50) 212 if err != nil { 213 log.Println(err) 214 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 215 } 216 217 repos, err := db.GetTopStarredReposLastWeek(s.db) 218 if err != nil { 219 log.Println(err) 220 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 221 return 222 } 223 224 s.pages.Timeline(w, pages.TimelineParams{ 225 LoggedInUser: user, 226 Timeline: timeline, 227 Repos: repos, 228 }) 229} 230 231func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 232 user := s.oauth.GetUser(r) 233 l := s.logger.With("handler", "UpgradeBanner") 234 l = l.With("did", user.Did) 235 l = l.With("handle", user.Handle) 236 237 regs, err := db.GetRegistrations( 238 s.db, 239 db.FilterEq("did", user.Did), 240 db.FilterEq("needs_upgrade", 1), 241 ) 242 if err != nil { 243 l.Error("non-fatal: failed to get registrations", "err", err) 244 } 245 246 spindles, err := db.GetSpindles( 247 s.db, 248 db.FilterEq("owner", user.Did), 249 db.FilterEq("needs_upgrade", 1), 250 ) 251 if err != nil { 252 l.Error("non-fatal: failed to get spindles", "err", err) 253 } 254 255 if regs == nil && spindles == nil { 256 return 257 } 258 259 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 260 Registrations: regs, 261 Spindles: spindles, 262 }) 263} 264 265func (s *State) Home(w http.ResponseWriter, r *http.Request) { 266 timeline, err := db.MakeTimeline(s.db, 5) 267 if err != nil { 268 log.Println(err) 269 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 270 return 271 } 272 273 repos, err := db.GetTopStarredReposLastWeek(s.db) 274 if err != nil { 275 log.Println(err) 276 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 277 return 278 } 279 280 s.pages.Home(w, pages.TimelineParams{ 281 LoggedInUser: nil, 282 Timeline: timeline, 283 Repos: repos, 284 }) 285} 286 287func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 288 user := chi.URLParam(r, "user") 289 user = strings.TrimPrefix(user, "@") 290 291 if user == "" { 292 w.WriteHeader(http.StatusBadRequest) 293 return 294 } 295 296 id, err := s.idResolver.ResolveIdent(r.Context(), user) 297 if err != nil { 298 w.WriteHeader(http.StatusInternalServerError) 299 return 300 } 301 302 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 303 if err != nil { 304 w.WriteHeader(http.StatusNotFound) 305 return 306 } 307 308 if len(pubKeys) == 0 { 309 w.WriteHeader(http.StatusNotFound) 310 return 311 } 312 313 for _, k := range pubKeys { 314 key := strings.TrimRight(k.Key, "\n") 315 fmt.Fprintln(w, key) 316 } 317} 318 319func validateRepoName(name string) error { 320 // check for path traversal attempts 321 if name == "." || name == ".." || 322 strings.Contains(name, "/") || strings.Contains(name, "\\") { 323 return fmt.Errorf("Repository name contains invalid path characters") 324 } 325 326 // check for sequences that could be used for traversal when normalized 327 if strings.Contains(name, "./") || strings.Contains(name, "../") || 328 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 329 return fmt.Errorf("Repository name contains invalid path sequence") 330 } 331 332 // then continue with character validation 333 for _, char := range name { 334 if !((char >= 'a' && char <= 'z') || 335 (char >= 'A' && char <= 'Z') || 336 (char >= '0' && char <= '9') || 337 char == '-' || char == '_' || char == '.') { 338 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 339 } 340 } 341 342 // additional check to prevent multiple sequential dots 343 if strings.Contains(name, "..") { 344 return fmt.Errorf("Repository name cannot contain sequential dots") 345 } 346 347 // if all checks pass 348 return nil 349} 350 351func stripGitExt(name string) string { 352 return strings.TrimSuffix(name, ".git") 353} 354 355func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 356 switch r.Method { 357 case http.MethodGet: 358 user := s.oauth.GetUser(r) 359 knots, err := s.enforcer.GetKnotsForUser(user.Did) 360 if err != nil { 361 s.pages.Notice(w, "repo", "Invalid user account.") 362 return 363 } 364 365 s.pages.NewRepo(w, pages.NewRepoParams{ 366 LoggedInUser: user, 367 Knots: knots, 368 }) 369 370 case http.MethodPost: 371 l := s.logger.With("handler", "NewRepo") 372 373 user := s.oauth.GetUser(r) 374 l = l.With("did", user.Did) 375 l = l.With("handle", user.Handle) 376 377 // form validation 378 domain := r.FormValue("domain") 379 if domain == "" { 380 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 381 return 382 } 383 l = l.With("knot", domain) 384 385 repoName := r.FormValue("name") 386 if repoName == "" { 387 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 388 return 389 } 390 391 if err := validateRepoName(repoName); err != nil { 392 s.pages.Notice(w, "repo", err.Error()) 393 return 394 } 395 repoName = stripGitExt(repoName) 396 l = l.With("repoName", repoName) 397 398 defaultBranch := r.FormValue("branch") 399 if defaultBranch == "" { 400 defaultBranch = "main" 401 } 402 l = l.With("defaultBranch", defaultBranch) 403 404 description := r.FormValue("description") 405 406 // ACL validation 407 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 408 if err != nil || !ok { 409 l.Info("unauthorized") 410 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 411 return 412 } 413 414 // Check for existing repos 415 existingRepo, err := db.GetRepo(s.db, user.Did, repoName) 416 if err == nil && existingRepo != nil { 417 l.Info("repo exists") 418 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 419 return 420 } 421 422 // create atproto record for this repo 423 rkey := tid.TID() 424 repo := &db.Repo{ 425 Did: user.Did, 426 Name: repoName, 427 Knot: domain, 428 Rkey: rkey, 429 Description: description, 430 } 431 432 xrpcClient, err := s.oauth.AuthorizedClient(r) 433 if err != nil { 434 l.Info("PDS write failed", "err", err) 435 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 436 return 437 } 438 439 createdAt := time.Now().Format(time.RFC3339) 440 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{ 441 Collection: tangled.RepoNSID, 442 Repo: user.Did, 443 Rkey: rkey, 444 Record: &lexutil.LexiconTypeDecoder{ 445 Val: &tangled.Repo{ 446 Knot: repo.Knot, 447 Name: repoName, 448 CreatedAt: createdAt, 449 Owner: user.Did, 450 }}, 451 }) 452 if err != nil { 453 l.Info("PDS write failed", "err", err) 454 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 455 return 456 } 457 458 aturi := atresp.Uri 459 l = l.With("aturi", aturi) 460 l.Info("wrote to PDS") 461 462 tx, err := s.db.BeginTx(r.Context(), nil) 463 if err != nil { 464 l.Info("txn failed", "err", err) 465 s.pages.Notice(w, "repo", "Failed to save repository information.") 466 return 467 } 468 469 // The rollback function reverts a few things on failure: 470 // - the pending txn 471 // - the ACLs 472 // - the atproto record created 473 rollback := func() { 474 err1 := tx.Rollback() 475 err2 := s.enforcer.E.LoadPolicy() 476 err3 := rollbackRecord(context.Background(), aturi, xrpcClient) 477 478 // ignore txn complete errors, this is okay 479 if errors.Is(err1, sql.ErrTxDone) { 480 err1 = nil 481 } 482 483 if errs := errors.Join(err1, err2, err3); errs != nil { 484 l.Error("failed to rollback changes", "errs", errs) 485 return 486 } 487 } 488 defer rollback() 489 490 client, err := s.oauth.ServiceClient( 491 r, 492 oauth.WithService(domain), 493 oauth.WithLxm(tangled.RepoCreateNSID), 494 oauth.WithDev(s.config.Core.Dev), 495 ) 496 if err != nil { 497 l.Error("service auth failed", "err", err) 498 s.pages.Notice(w, "repo", "Failed to reach PDS.") 499 return 500 } 501 502 xe := tangled.RepoCreate( 503 r.Context(), 504 client, 505 &tangled.RepoCreate_Input{ 506 Rkey: rkey, 507 }, 508 ) 509 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 510 l.Error("xrpc error", "xe", xe) 511 s.pages.Notice(w, "repo", err.Error()) 512 return 513 } 514 515 err = db.AddRepo(tx, repo) 516 if err != nil { 517 l.Error("db write failed", "err", err) 518 s.pages.Notice(w, "repo", "Failed to save repository information.") 519 return 520 } 521 522 // acls 523 p, _ := securejoin.SecureJoin(user.Did, repoName) 524 err = s.enforcer.AddRepo(user.Did, domain, p) 525 if err != nil { 526 l.Error("acl setup failed", "err", err) 527 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 528 return 529 } 530 531 err = tx.Commit() 532 if err != nil { 533 l.Error("txn commit failed", "err", err) 534 http.Error(w, err.Error(), http.StatusInternalServerError) 535 return 536 } 537 538 err = s.enforcer.E.SavePolicy() 539 if err != nil { 540 l.Error("acl save failed", "err", err) 541 http.Error(w, err.Error(), http.StatusInternalServerError) 542 return 543 } 544 545 // reset the ATURI because the transaction completed successfully 546 aturi = "" 547 548 s.notifier.NewRepo(r.Context(), repo) 549 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName)) 550 } 551} 552 553// this is used to rollback changes made to the PDS 554// 555// it is a no-op if the provided ATURI is empty 556func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error { 557 if aturi == "" { 558 return nil 559 } 560 561 parsed := syntax.ATURI(aturi) 562 563 collection := parsed.Collection().String() 564 repo := parsed.Authority().String() 565 rkey := parsed.RecordKey().String() 566 567 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{ 568 Collection: collection, 569 Repo: repo, 570 Rkey: rkey, 571 }) 572 return err 573}