forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log" 9 "log/slog" 10 "net/http" 11 "strings" 12 "time" 13 14 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 lexutil "github.com/bluesky-social/indigo/lex/util" 17 securejoin "github.com/cyphar/filepath-securejoin" 18 "github.com/go-chi/chi/v5" 19 "github.com/posthog/posthog-go" 20 "tangled.sh/tangled.sh/core/api/tangled" 21 "tangled.sh/tangled.sh/core/appview" 22 "tangled.sh/tangled.sh/core/appview/cache" 23 "tangled.sh/tangled.sh/core/appview/cache/session" 24 "tangled.sh/tangled.sh/core/appview/config" 25 "tangled.sh/tangled.sh/core/appview/db" 26 "tangled.sh/tangled.sh/core/appview/notify" 27 "tangled.sh/tangled.sh/core/appview/oauth" 28 "tangled.sh/tangled.sh/core/appview/pages" 29 posthogService "tangled.sh/tangled.sh/core/appview/posthog" 30 "tangled.sh/tangled.sh/core/appview/reporesolver" 31 xrpcclient "tangled.sh/tangled.sh/core/appview/xrpcclient" 32 "tangled.sh/tangled.sh/core/eventconsumer" 33 "tangled.sh/tangled.sh/core/idresolver" 34 "tangled.sh/tangled.sh/core/jetstream" 35 tlog "tangled.sh/tangled.sh/core/log" 36 "tangled.sh/tangled.sh/core/rbac" 37 "tangled.sh/tangled.sh/core/tid" 38 // xrpcerr "tangled.sh/tangled.sh/core/xrpc/errors" 39) 40 41type State struct { 42 db *db.DB 43 notifier notify.Notifier 44 oauth *oauth.OAuth 45 enforcer *rbac.Enforcer 46 pages *pages.Pages 47 sess *session.SessionStore 48 idResolver *idresolver.Resolver 49 posthog posthog.Client 50 jc *jetstream.JetstreamClient 51 config *config.Config 52 repoResolver *reporesolver.RepoResolver 53 knotstream *eventconsumer.Consumer 54 spindlestream *eventconsumer.Consumer 55 logger *slog.Logger 56} 57 58func Make(ctx context.Context, config *config.Config) (*State, error) { 59 d, err := db.Make(config.Core.DbPath) 60 if err != nil { 61 return nil, fmt.Errorf("failed to create db: %w", err) 62 } 63 64 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 65 if err != nil { 66 return nil, fmt.Errorf("failed to create enforcer: %w", err) 67 } 68 69 res, err := idresolver.RedisResolver(config.Redis.ToURL()) 70 if err != nil { 71 log.Printf("failed to create redis resolver: %v", err) 72 res = idresolver.DefaultResolver() 73 } 74 75 pgs := pages.NewPages(config, res) 76 77 cache := cache.New(config.Redis.Addr) 78 sess := session.New(cache) 79 80 oauth := oauth.NewOAuth(config, sess) 81 82 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 83 if err != nil { 84 return nil, fmt.Errorf("failed to create posthog client: %w", err) 85 } 86 87 repoResolver := reporesolver.New(config, enforcer, res, d) 88 89 wrapper := db.DbWrapper{d} 90 jc, err := jetstream.NewJetstreamClient( 91 config.Jetstream.Endpoint, 92 "appview", 93 []string{ 94 tangled.GraphFollowNSID, 95 tangled.FeedStarNSID, 96 tangled.PublicKeyNSID, 97 tangled.RepoArtifactNSID, 98 tangled.ActorProfileNSID, 99 tangled.SpindleMemberNSID, 100 tangled.SpindleNSID, 101 tangled.StringNSID, 102 }, 103 nil, 104 slog.Default(), 105 wrapper, 106 false, 107 108 // in-memory filter is inapplicalble to appview so 109 // we'll never log dids anyway. 110 false, 111 ) 112 if err != nil { 113 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 114 } 115 116 ingester := appview.Ingester{ 117 Db: wrapper, 118 Enforcer: enforcer, 119 IdResolver: res, 120 Config: config, 121 Logger: tlog.New("ingester"), 122 } 123 err = jc.StartJetstream(ctx, ingester.Ingest()) 124 if err != nil { 125 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 126 } 127 128 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 129 if err != nil { 130 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 131 } 132 knotstream.Start(ctx) 133 134 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 135 if err != nil { 136 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 137 } 138 spindlestream.Start(ctx) 139 140 var notifiers []notify.Notifier 141 if !config.Core.Dev { 142 notifiers = append(notifiers, posthogService.NewPosthogNotifier(posthog)) 143 } 144 notifier := notify.NewMergedNotifier(notifiers...) 145 146 state := &State{ 147 d, 148 notifier, 149 oauth, 150 enforcer, 151 pgs, 152 sess, 153 res, 154 posthog, 155 jc, 156 config, 157 repoResolver, 158 knotstream, 159 spindlestream, 160 slog.Default(), 161 } 162 163 return state, nil 164} 165 166func (s *State) Favicon(w http.ResponseWriter, r *http.Request) { 167 w.Header().Set("Content-Type", "image/svg+xml") 168 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year 169 w.Header().Set("ETag", `"favicon-svg-v1"`) 170 171 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` { 172 w.WriteHeader(http.StatusNotModified) 173 return 174 } 175 176 s.pages.Favicon(w) 177} 178 179func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 180 user := s.oauth.GetUser(r) 181 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 182 LoggedInUser: user, 183 }) 184} 185 186func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 187 user := s.oauth.GetUser(r) 188 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 189 LoggedInUser: user, 190 }) 191} 192 193func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 194 user := s.oauth.GetUser(r) 195 196 timeline, err := db.MakeTimeline(s.db) 197 if err != nil { 198 log.Println(err) 199 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 200 } 201 202 repos, err := db.GetTopStarredReposLastWeek(s.db) 203 if err != nil { 204 log.Println(err) 205 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 206 return 207 } 208 209 s.pages.Timeline(w, pages.TimelineParams{ 210 LoggedInUser: user, 211 Timeline: timeline, 212 Repos: repos, 213 }) 214} 215 216func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 217 user := chi.URLParam(r, "user") 218 user = strings.TrimPrefix(user, "@") 219 220 if user == "" { 221 w.WriteHeader(http.StatusBadRequest) 222 return 223 } 224 225 id, err := s.idResolver.ResolveIdent(r.Context(), user) 226 if err != nil { 227 w.WriteHeader(http.StatusInternalServerError) 228 return 229 } 230 231 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 232 if err != nil { 233 w.WriteHeader(http.StatusNotFound) 234 return 235 } 236 237 if len(pubKeys) == 0 { 238 w.WriteHeader(http.StatusNotFound) 239 return 240 } 241 242 for _, k := range pubKeys { 243 key := strings.TrimRight(k.Key, "\n") 244 w.Write([]byte(fmt.Sprintln(key))) 245 } 246} 247 248func validateRepoName(name string) error { 249 // check for path traversal attempts 250 if name == "." || name == ".." || 251 strings.Contains(name, "/") || strings.Contains(name, "\\") { 252 return fmt.Errorf("Repository name contains invalid path characters") 253 } 254 255 // check for sequences that could be used for traversal when normalized 256 if strings.Contains(name, "./") || strings.Contains(name, "../") || 257 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 258 return fmt.Errorf("Repository name contains invalid path sequence") 259 } 260 261 // then continue with character validation 262 for _, char := range name { 263 if !((char >= 'a' && char <= 'z') || 264 (char >= 'A' && char <= 'Z') || 265 (char >= '0' && char <= '9') || 266 char == '-' || char == '_' || char == '.') { 267 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 268 } 269 } 270 271 // additional check to prevent multiple sequential dots 272 if strings.Contains(name, "..") { 273 return fmt.Errorf("Repository name cannot contain sequential dots") 274 } 275 276 // if all checks pass 277 return nil 278} 279 280func stripGitExt(name string) string { 281 return strings.TrimSuffix(name, ".git") 282} 283 284func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 285 switch r.Method { 286 case http.MethodGet: 287 user := s.oauth.GetUser(r) 288 knots, err := s.enforcer.GetKnotsForUser(user.Did) 289 if err != nil { 290 s.pages.Notice(w, "repo", "Invalid user account.") 291 return 292 } 293 294 s.pages.NewRepo(w, pages.NewRepoParams{ 295 LoggedInUser: user, 296 Knots: knots, 297 }) 298 299 case http.MethodPost: 300 l := s.logger.With("handler", "NewRepo") 301 302 user := s.oauth.GetUser(r) 303 l = l.With("did", user.Did) 304 l = l.With("handle", user.Handle) 305 306 // form validation 307 domain := r.FormValue("domain") 308 if domain == "" { 309 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 310 return 311 } 312 l = l.With("knot", domain) 313 314 repoName := r.FormValue("name") 315 if repoName == "" { 316 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 317 return 318 } 319 320 if err := validateRepoName(repoName); err != nil { 321 s.pages.Notice(w, "repo", err.Error()) 322 return 323 } 324 repoName = stripGitExt(repoName) 325 l = l.With("repoName", repoName) 326 327 defaultBranch := r.FormValue("branch") 328 if defaultBranch == "" { 329 defaultBranch = "main" 330 } 331 l = l.With("defaultBranch", defaultBranch) 332 333 description := r.FormValue("description") 334 335 // ACL validation 336 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 337 if err != nil || !ok { 338 l.Info("unauthorized") 339 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 340 return 341 } 342 343 // Check for existing repos 344 existingRepo, err := db.GetRepo(s.db, user.Did, repoName) 345 if err == nil && existingRepo != nil { 346 l.Info("repo exists") 347 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 348 return 349 } 350 351 // create atproto record for this repo 352 rkey := tid.TID() 353 repo := &db.Repo{ 354 Did: user.Did, 355 Name: repoName, 356 Knot: domain, 357 Rkey: rkey, 358 Description: description, 359 } 360 361 xrpcClient, err := s.oauth.AuthorizedClient(r) 362 if err != nil { 363 l.Info("PDS write failed", "err", err) 364 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 365 return 366 } 367 368 createdAt := time.Now().Format(time.RFC3339) 369 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{ 370 Collection: tangled.RepoNSID, 371 Repo: user.Did, 372 Rkey: rkey, 373 Record: &lexutil.LexiconTypeDecoder{ 374 Val: &tangled.Repo{ 375 Knot: repo.Knot, 376 Name: repoName, 377 CreatedAt: createdAt, 378 Owner: user.Did, 379 }}, 380 }) 381 if err != nil { 382 l.Info("PDS write failed", "err", err) 383 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 384 return 385 } 386 387 aturi := atresp.Uri 388 l = l.With("aturi", aturi) 389 l.Info("wrote to PDS") 390 391 tx, err := s.db.BeginTx(r.Context(), nil) 392 if err != nil { 393 l.Info("txn failed", "err", err) 394 s.pages.Notice(w, "repo", "Failed to save repository information.") 395 return 396 } 397 398 // The rollback function reverts a few things on failure: 399 // - the pending txn 400 // - the ACLs 401 // - the atproto record created 402 rollback := func() { 403 err1 := tx.Rollback() 404 err2 := s.enforcer.E.LoadPolicy() 405 err3 := rollbackRecord(context.Background(), aturi, xrpcClient) 406 407 // ignore txn complete errors, this is okay 408 if errors.Is(err1, sql.ErrTxDone) { 409 err1 = nil 410 } 411 412 if errs := errors.Join(err1, err2, err3); errs != nil { 413 l.Error("failed to rollback changes", "errs", errs) 414 return 415 } 416 } 417 defer rollback() 418 419 client, err := s.oauth.ServiceClient( 420 r, 421 oauth.WithService(domain), 422 oauth.WithLxm(tangled.RepoCreateNSID), 423 oauth.WithDev(s.config.Core.Dev), 424 ) 425 if err != nil { 426 l.Error("service auth failed", "err", err) 427 s.pages.Notice(w, "repo", "Failed to reach PDS.") 428 return 429 } 430 431 xe := tangled.RepoCreate( 432 r.Context(), 433 client, 434 &tangled.RepoCreate_Input{ 435 Rkey: rkey, 436 }, 437 ) 438 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 439 l.Error("xrpc error", "xe", xe) 440 s.pages.Notice(w, "repo", err.Error()) 441 return 442 } 443 444 err = db.AddRepo(tx, repo) 445 if err != nil { 446 l.Error("db write failed", "err", err) 447 s.pages.Notice(w, "repo", "Failed to save repository information.") 448 return 449 } 450 451 // acls 452 p, _ := securejoin.SecureJoin(user.Did, repoName) 453 err = s.enforcer.AddRepo(user.Did, domain, p) 454 if err != nil { 455 l.Error("acl setup failed", "err", err) 456 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 457 return 458 } 459 460 err = tx.Commit() 461 if err != nil { 462 l.Error("txn commit failed", "err", err) 463 http.Error(w, err.Error(), http.StatusInternalServerError) 464 return 465 } 466 467 err = s.enforcer.E.SavePolicy() 468 if err != nil { 469 l.Error("acl save failed", "err", err) 470 http.Error(w, err.Error(), http.StatusInternalServerError) 471 return 472 } 473 474 // reset the ATURI because the transaction completed successfully 475 aturi = "" 476 477 s.notifier.NewRepo(r.Context(), repo) 478 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName)) 479 } 480} 481 482// this is used to rollback changes made to the PDS 483// 484// it is a no-op if the provided ATURI is empty 485func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error { 486 if aturi == "" { 487 return nil 488 } 489 490 parsed := syntax.ATURI(aturi) 491 492 collection := parsed.Collection().String() 493 repo := parsed.Authority().String() 494 rkey := parsed.RecordKey().String() 495 496 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{ 497 Collection: collection, 498 Repo: repo, 499 Rkey: rkey, 500 }) 501 return err 502}