forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log" 9 "log/slog" 10 "net/http" 11 "strings" 12 "time" 13 14 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 lexutil "github.com/bluesky-social/indigo/lex/util" 17 securejoin "github.com/cyphar/filepath-securejoin" 18 "github.com/go-chi/chi/v5" 19 "github.com/posthog/posthog-go" 20 "tangled.sh/tangled.sh/core/api/tangled" 21 "tangled.sh/tangled.sh/core/appview" 22 "tangled.sh/tangled.sh/core/appview/cache" 23 "tangled.sh/tangled.sh/core/appview/cache/session" 24 "tangled.sh/tangled.sh/core/appview/config" 25 "tangled.sh/tangled.sh/core/appview/db" 26 "tangled.sh/tangled.sh/core/appview/notify" 27 "tangled.sh/tangled.sh/core/appview/oauth" 28 "tangled.sh/tangled.sh/core/appview/pages" 29 posthogService "tangled.sh/tangled.sh/core/appview/posthog" 30 "tangled.sh/tangled.sh/core/appview/reporesolver" 31 "tangled.sh/tangled.sh/core/appview/validator" 32 xrpcclient "tangled.sh/tangled.sh/core/appview/xrpcclient" 33 "tangled.sh/tangled.sh/core/eventconsumer" 34 "tangled.sh/tangled.sh/core/idresolver" 35 "tangled.sh/tangled.sh/core/jetstream" 36 tlog "tangled.sh/tangled.sh/core/log" 37 "tangled.sh/tangled.sh/core/rbac" 38 "tangled.sh/tangled.sh/core/tid" 39 // xrpcerr "tangled.sh/tangled.sh/core/xrpc/errors" 40) 41 42type State struct { 43 db *db.DB 44 notifier notify.Notifier 45 oauth *oauth.OAuth 46 enforcer *rbac.Enforcer 47 pages *pages.Pages 48 sess *session.SessionStore 49 idResolver *idresolver.Resolver 50 posthog posthog.Client 51 jc *jetstream.JetstreamClient 52 config *config.Config 53 repoResolver *reporesolver.RepoResolver 54 knotstream *eventconsumer.Consumer 55 spindlestream *eventconsumer.Consumer 56 logger *slog.Logger 57 validator *validator.Validator 58} 59 60func Make(ctx context.Context, config *config.Config) (*State, error) { 61 d, err := db.Make(config.Core.DbPath) 62 if err != nil { 63 return nil, fmt.Errorf("failed to create db: %w", err) 64 } 65 66 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 67 if err != nil { 68 return nil, fmt.Errorf("failed to create enforcer: %w", err) 69 } 70 71 res, err := idresolver.RedisResolver(config.Redis.ToURL()) 72 if err != nil { 73 log.Printf("failed to create redis resolver: %v", err) 74 res = idresolver.DefaultResolver() 75 } 76 77 pgs := pages.NewPages(config, res) 78 cache := cache.New(config.Redis.Addr) 79 sess := session.New(cache) 80 oauth := oauth.NewOAuth(config, sess) 81 validator := validator.New(d) 82 83 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 84 if err != nil { 85 return nil, fmt.Errorf("failed to create posthog client: %w", err) 86 } 87 88 repoResolver := reporesolver.New(config, enforcer, res, d) 89 90 wrapper := db.DbWrapper{d} 91 jc, err := jetstream.NewJetstreamClient( 92 config.Jetstream.Endpoint, 93 "appview", 94 []string{ 95 tangled.GraphFollowNSID, 96 tangled.FeedStarNSID, 97 tangled.PublicKeyNSID, 98 tangled.RepoArtifactNSID, 99 tangled.ActorProfileNSID, 100 tangled.SpindleMemberNSID, 101 tangled.SpindleNSID, 102 tangled.StringNSID, 103 tangled.RepoIssueNSID, 104 tangled.RepoIssueCommentNSID, 105 }, 106 nil, 107 slog.Default(), 108 wrapper, 109 false, 110 111 // in-memory filter is inapplicalble to appview so 112 // we'll never log dids anyway. 113 false, 114 ) 115 if err != nil { 116 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 117 } 118 119 ingester := appview.Ingester{ 120 Db: wrapper, 121 Enforcer: enforcer, 122 IdResolver: res, 123 Config: config, 124 Logger: tlog.New("ingester"), 125 Validator: validator, 126 } 127 err = jc.StartJetstream(ctx, ingester.Ingest()) 128 if err != nil { 129 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 130 } 131 132 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 133 if err != nil { 134 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 135 } 136 knotstream.Start(ctx) 137 138 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 139 if err != nil { 140 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 141 } 142 spindlestream.Start(ctx) 143 144 var notifiers []notify.Notifier 145 if !config.Core.Dev { 146 notifiers = append(notifiers, posthogService.NewPosthogNotifier(posthog)) 147 } 148 notifier := notify.NewMergedNotifier(notifiers...) 149 150 state := &State{ 151 d, 152 notifier, 153 oauth, 154 enforcer, 155 pgs, 156 sess, 157 res, 158 posthog, 159 jc, 160 config, 161 repoResolver, 162 knotstream, 163 spindlestream, 164 slog.Default(), 165 validator, 166 } 167 168 return state, nil 169} 170 171func (s *State) Close() error { 172 // other close up logic goes here 173 return s.db.Close() 174} 175 176func (s *State) Favicon(w http.ResponseWriter, r *http.Request) { 177 w.Header().Set("Content-Type", "image/svg+xml") 178 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year 179 w.Header().Set("ETag", `"favicon-svg-v1"`) 180 181 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` { 182 w.WriteHeader(http.StatusNotModified) 183 return 184 } 185 186 s.pages.Favicon(w) 187} 188 189func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 190 user := s.oauth.GetUser(r) 191 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 192 LoggedInUser: user, 193 }) 194} 195 196func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 197 user := s.oauth.GetUser(r) 198 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 199 LoggedInUser: user, 200 }) 201} 202 203func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) { 204 if s.oauth.GetUser(r) != nil { 205 s.Timeline(w, r) 206 return 207 } 208 s.Home(w, r) 209} 210 211func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 212 user := s.oauth.GetUser(r) 213 214 timeline, err := db.MakeTimeline(s.db, 50) 215 if err != nil { 216 log.Println(err) 217 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 218 } 219 220 repos, err := db.GetTopStarredReposLastWeek(s.db) 221 if err != nil { 222 log.Println(err) 223 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 224 return 225 } 226 227 s.pages.Timeline(w, pages.TimelineParams{ 228 LoggedInUser: user, 229 Timeline: timeline, 230 Repos: repos, 231 }) 232} 233 234func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 235 user := s.oauth.GetUser(r) 236 l := s.logger.With("handler", "UpgradeBanner") 237 l = l.With("did", user.Did) 238 l = l.With("handle", user.Handle) 239 240 regs, err := db.GetRegistrations( 241 s.db, 242 db.FilterEq("did", user.Did), 243 db.FilterEq("needs_upgrade", 1), 244 ) 245 if err != nil { 246 l.Error("non-fatal: failed to get registrations", "err", err) 247 } 248 249 spindles, err := db.GetSpindles( 250 s.db, 251 db.FilterEq("owner", user.Did), 252 db.FilterEq("needs_upgrade", 1), 253 ) 254 if err != nil { 255 l.Error("non-fatal: failed to get spindles", "err", err) 256 } 257 258 if regs == nil && spindles == nil { 259 return 260 } 261 262 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 263 Registrations: regs, 264 Spindles: spindles, 265 }) 266} 267 268func (s *State) Home(w http.ResponseWriter, r *http.Request) { 269 timeline, err := db.MakeTimeline(s.db, 5) 270 if err != nil { 271 log.Println(err) 272 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 273 return 274 } 275 276 repos, err := db.GetTopStarredReposLastWeek(s.db) 277 if err != nil { 278 log.Println(err) 279 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 280 return 281 } 282 283 s.pages.Home(w, pages.TimelineParams{ 284 LoggedInUser: nil, 285 Timeline: timeline, 286 Repos: repos, 287 }) 288} 289 290func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 291 user := chi.URLParam(r, "user") 292 user = strings.TrimPrefix(user, "@") 293 294 if user == "" { 295 w.WriteHeader(http.StatusBadRequest) 296 return 297 } 298 299 id, err := s.idResolver.ResolveIdent(r.Context(), user) 300 if err != nil { 301 w.WriteHeader(http.StatusInternalServerError) 302 return 303 } 304 305 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 306 if err != nil { 307 w.WriteHeader(http.StatusNotFound) 308 return 309 } 310 311 if len(pubKeys) == 0 { 312 w.WriteHeader(http.StatusNotFound) 313 return 314 } 315 316 for _, k := range pubKeys { 317 key := strings.TrimRight(k.Key, "\n") 318 fmt.Fprintln(w, key) 319 } 320} 321 322func validateRepoName(name string) error { 323 // check for path traversal attempts 324 if name == "." || name == ".." || 325 strings.Contains(name, "/") || strings.Contains(name, "\\") { 326 return fmt.Errorf("Repository name contains invalid path characters") 327 } 328 329 // check for sequences that could be used for traversal when normalized 330 if strings.Contains(name, "./") || strings.Contains(name, "../") || 331 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 332 return fmt.Errorf("Repository name contains invalid path sequence") 333 } 334 335 // then continue with character validation 336 for _, char := range name { 337 if !((char >= 'a' && char <= 'z') || 338 (char >= 'A' && char <= 'Z') || 339 (char >= '0' && char <= '9') || 340 char == '-' || char == '_' || char == '.') { 341 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 342 } 343 } 344 345 // additional check to prevent multiple sequential dots 346 if strings.Contains(name, "..") { 347 return fmt.Errorf("Repository name cannot contain sequential dots") 348 } 349 350 // if all checks pass 351 return nil 352} 353 354func stripGitExt(name string) string { 355 return strings.TrimSuffix(name, ".git") 356} 357 358func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 359 switch r.Method { 360 case http.MethodGet: 361 user := s.oauth.GetUser(r) 362 knots, err := s.enforcer.GetKnotsForUser(user.Did) 363 if err != nil { 364 s.pages.Notice(w, "repo", "Invalid user account.") 365 return 366 } 367 368 s.pages.NewRepo(w, pages.NewRepoParams{ 369 LoggedInUser: user, 370 Knots: knots, 371 }) 372 373 case http.MethodPost: 374 l := s.logger.With("handler", "NewRepo") 375 376 user := s.oauth.GetUser(r) 377 l = l.With("did", user.Did) 378 l = l.With("handle", user.Handle) 379 380 // form validation 381 domain := r.FormValue("domain") 382 if domain == "" { 383 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 384 return 385 } 386 l = l.With("knot", domain) 387 388 repoName := r.FormValue("name") 389 if repoName == "" { 390 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 391 return 392 } 393 394 if err := validateRepoName(repoName); err != nil { 395 s.pages.Notice(w, "repo", err.Error()) 396 return 397 } 398 repoName = stripGitExt(repoName) 399 l = l.With("repoName", repoName) 400 401 defaultBranch := r.FormValue("branch") 402 if defaultBranch == "" { 403 defaultBranch = "main" 404 } 405 l = l.With("defaultBranch", defaultBranch) 406 407 description := r.FormValue("description") 408 409 // ACL validation 410 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 411 if err != nil || !ok { 412 l.Info("unauthorized") 413 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 414 return 415 } 416 417 // Check for existing repos 418 existingRepo, err := db.GetRepo(s.db, user.Did, repoName) 419 if err == nil && existingRepo != nil { 420 l.Info("repo exists") 421 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 422 return 423 } 424 425 // create atproto record for this repo 426 rkey := tid.TID() 427 repo := &db.Repo{ 428 Did: user.Did, 429 Name: repoName, 430 Knot: domain, 431 Rkey: rkey, 432 Description: description, 433 } 434 435 xrpcClient, err := s.oauth.AuthorizedClient(r) 436 if err != nil { 437 l.Info("PDS write failed", "err", err) 438 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 439 return 440 } 441 442 createdAt := time.Now().Format(time.RFC3339) 443 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{ 444 Collection: tangled.RepoNSID, 445 Repo: user.Did, 446 Rkey: rkey, 447 Record: &lexutil.LexiconTypeDecoder{ 448 Val: &tangled.Repo{ 449 Knot: repo.Knot, 450 Name: repoName, 451 CreatedAt: createdAt, 452 Owner: user.Did, 453 }}, 454 }) 455 if err != nil { 456 l.Info("PDS write failed", "err", err) 457 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 458 return 459 } 460 461 aturi := atresp.Uri 462 l = l.With("aturi", aturi) 463 l.Info("wrote to PDS") 464 465 tx, err := s.db.BeginTx(r.Context(), nil) 466 if err != nil { 467 l.Info("txn failed", "err", err) 468 s.pages.Notice(w, "repo", "Failed to save repository information.") 469 return 470 } 471 472 // The rollback function reverts a few things on failure: 473 // - the pending txn 474 // - the ACLs 475 // - the atproto record created 476 rollback := func() { 477 err1 := tx.Rollback() 478 err2 := s.enforcer.E.LoadPolicy() 479 err3 := rollbackRecord(context.Background(), aturi, xrpcClient) 480 481 // ignore txn complete errors, this is okay 482 if errors.Is(err1, sql.ErrTxDone) { 483 err1 = nil 484 } 485 486 if errs := errors.Join(err1, err2, err3); errs != nil { 487 l.Error("failed to rollback changes", "errs", errs) 488 return 489 } 490 } 491 defer rollback() 492 493 client, err := s.oauth.ServiceClient( 494 r, 495 oauth.WithService(domain), 496 oauth.WithLxm(tangled.RepoCreateNSID), 497 oauth.WithDev(s.config.Core.Dev), 498 ) 499 if err != nil { 500 l.Error("service auth failed", "err", err) 501 s.pages.Notice(w, "repo", "Failed to reach PDS.") 502 return 503 } 504 505 xe := tangled.RepoCreate( 506 r.Context(), 507 client, 508 &tangled.RepoCreate_Input{ 509 Rkey: rkey, 510 }, 511 ) 512 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 513 l.Error("xrpc error", "xe", xe) 514 s.pages.Notice(w, "repo", err.Error()) 515 return 516 } 517 518 err = db.AddRepo(tx, repo) 519 if err != nil { 520 l.Error("db write failed", "err", err) 521 s.pages.Notice(w, "repo", "Failed to save repository information.") 522 return 523 } 524 525 // acls 526 p, _ := securejoin.SecureJoin(user.Did, repoName) 527 err = s.enforcer.AddRepo(user.Did, domain, p) 528 if err != nil { 529 l.Error("acl setup failed", "err", err) 530 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 531 return 532 } 533 534 err = tx.Commit() 535 if err != nil { 536 l.Error("txn commit failed", "err", err) 537 http.Error(w, err.Error(), http.StatusInternalServerError) 538 return 539 } 540 541 err = s.enforcer.E.SavePolicy() 542 if err != nil { 543 l.Error("acl save failed", "err", err) 544 http.Error(w, err.Error(), http.StatusInternalServerError) 545 return 546 } 547 548 // reset the ATURI because the transaction completed successfully 549 aturi = "" 550 551 s.notifier.NewRepo(r.Context(), repo) 552 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName)) 553 } 554} 555 556// this is used to rollback changes made to the PDS 557// 558// it is a no-op if the provided ATURI is empty 559func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error { 560 if aturi == "" { 561 return nil 562 } 563 564 parsed := syntax.ATURI(aturi) 565 566 collection := parsed.Collection().String() 567 repo := parsed.Authority().String() 568 rkey := parsed.RecordKey().String() 569 570 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{ 571 Collection: collection, 572 Repo: repo, 573 Rkey: rkey, 574 }) 575 return err 576}