forked from tangled.org/core
this repo has no description
1package state 2 3import ( 4 "context" 5 "crypto/hmac" 6 "crypto/sha256" 7 "encoding/hex" 8 "fmt" 9 "log" 10 "log/slog" 11 "net/http" 12 "strings" 13 "time" 14 15 comatproto "github.com/bluesky-social/indigo/api/atproto" 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 lexutil "github.com/bluesky-social/indigo/lex/util" 18 securejoin "github.com/cyphar/filepath-securejoin" 19 "github.com/go-chi/chi/v5" 20 "github.com/posthog/posthog-go" 21 "tangled.sh/tangled.sh/core/api/tangled" 22 "tangled.sh/tangled.sh/core/appview" 23 "tangled.sh/tangled.sh/core/appview/cache" 24 "tangled.sh/tangled.sh/core/appview/cache/session" 25 "tangled.sh/tangled.sh/core/appview/config" 26 "tangled.sh/tangled.sh/core/appview/db" 27 "tangled.sh/tangled.sh/core/appview/idresolver" 28 "tangled.sh/tangled.sh/core/appview/oauth" 29 "tangled.sh/tangled.sh/core/appview/pages" 30 "tangled.sh/tangled.sh/core/appview/reporesolver" 31 "tangled.sh/tangled.sh/core/jetstream" 32 "tangled.sh/tangled.sh/core/knotclient" 33 "tangled.sh/tangled.sh/core/rbac" 34) 35 36type State struct { 37 db *db.DB 38 oauth *oauth.OAuth 39 enforcer *rbac.Enforcer 40 tidClock syntax.TIDClock 41 pages *pages.Pages 42 sess *session.SessionStore 43 idResolver *idresolver.Resolver 44 posthog posthog.Client 45 jc *jetstream.JetstreamClient 46 config *config.Config 47 repoResolver *reporesolver.RepoResolver 48 knotstream *knotclient.EventConsumer 49} 50 51func Make(ctx context.Context, config *config.Config) (*State, error) { 52 d, err := db.Make(config.Core.DbPath) 53 if err != nil { 54 return nil, err 55 } 56 57 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 58 if err != nil { 59 return nil, err 60 } 61 62 clock := syntax.NewTIDClock(0) 63 64 pgs := pages.NewPages(config) 65 66 res, err := idresolver.RedisResolver(config.Redis) 67 if err != nil { 68 log.Printf("failed to create redis resolver: %v", err) 69 res = idresolver.DefaultResolver() 70 } 71 72 cache := cache.New(config.Redis.Addr) 73 sess := session.New(cache) 74 75 oauth := oauth.NewOAuth(config, sess) 76 77 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 78 if err != nil { 79 return nil, fmt.Errorf("failed to create posthog client: %w", err) 80 } 81 82 repoResolver := reporesolver.New(config, enforcer, res, d) 83 84 wrapper := db.DbWrapper{d} 85 jc, err := jetstream.NewJetstreamClient( 86 config.Jetstream.Endpoint, 87 "appview", 88 []string{ 89 tangled.GraphFollowNSID, 90 tangled.FeedStarNSID, 91 tangled.PublicKeyNSID, 92 tangled.RepoArtifactNSID, 93 tangled.ActorProfileNSID, 94 }, 95 nil, 96 slog.Default(), 97 wrapper, 98 false, 99 100 // in-memory filter is inapplicalble to appview so 101 // we'll never log dids anyway. 102 false, 103 ) 104 if err != nil { 105 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 106 } 107 err = jc.StartJetstream(ctx, appview.Ingest(wrapper, enforcer)) 108 if err != nil { 109 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 110 } 111 112 knotstream, err := KnotstreamConsumer(ctx, config, d, enforcer, posthog) 113 if err != nil { 114 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 115 } 116 knotstream.Start(ctx) 117 118 state := &State{ 119 d, 120 oauth, 121 enforcer, 122 clock, 123 pgs, 124 sess, 125 res, 126 posthog, 127 jc, 128 config, 129 repoResolver, 130 knotstream, 131 } 132 133 return state, nil 134} 135 136func TID(c *syntax.TIDClock) string { 137 return c.Next().String() 138} 139 140func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 141 user := s.oauth.GetUser(r) 142 143 timeline, err := db.MakeTimeline(s.db) 144 if err != nil { 145 log.Println(err) 146 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 147 } 148 149 var didsToResolve []string 150 for _, ev := range timeline { 151 if ev.Repo != nil { 152 didsToResolve = append(didsToResolve, ev.Repo.Did) 153 if ev.Source != nil { 154 didsToResolve = append(didsToResolve, ev.Source.Did) 155 } 156 } 157 if ev.Follow != nil { 158 didsToResolve = append(didsToResolve, ev.Follow.UserDid, ev.Follow.SubjectDid) 159 } 160 if ev.Star != nil { 161 didsToResolve = append(didsToResolve, ev.Star.StarredByDid, ev.Star.Repo.Did) 162 } 163 } 164 165 resolvedIds := s.idResolver.ResolveIdents(r.Context(), didsToResolve) 166 didHandleMap := make(map[string]string) 167 for _, identity := range resolvedIds { 168 if !identity.Handle.IsInvalidHandle() { 169 didHandleMap[identity.DID.String()] = fmt.Sprintf("@%s", identity.Handle.String()) 170 } else { 171 didHandleMap[identity.DID.String()] = identity.DID.String() 172 } 173 } 174 175 s.pages.Timeline(w, pages.TimelineParams{ 176 LoggedInUser: user, 177 Timeline: timeline, 178 DidHandleMap: didHandleMap, 179 }) 180 181 return 182} 183 184// requires auth 185func (s *State) RegistrationKey(w http.ResponseWriter, r *http.Request) { 186 switch r.Method { 187 case http.MethodGet: 188 // list open registrations under this did 189 190 return 191 case http.MethodPost: 192 session, err := s.oauth.Stores().Get(r, oauth.SessionName) 193 if err != nil || session.IsNew { 194 log.Println("unauthorized attempt to generate registration key") 195 http.Error(w, "Forbidden", http.StatusUnauthorized) 196 return 197 } 198 199 did := session.Values[oauth.SessionDid].(string) 200 201 // check if domain is valid url, and strip extra bits down to just host 202 domain := r.FormValue("domain") 203 if domain == "" { 204 http.Error(w, "Invalid form", http.StatusBadRequest) 205 return 206 } 207 208 key, err := db.GenerateRegistrationKey(s.db, domain, did) 209 210 if err != nil { 211 log.Println(err) 212 http.Error(w, "unable to register this domain", http.StatusNotAcceptable) 213 return 214 } 215 216 w.Write([]byte(key)) 217 } 218} 219 220func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 221 user := chi.URLParam(r, "user") 222 user = strings.TrimPrefix(user, "@") 223 224 if user == "" { 225 w.WriteHeader(http.StatusBadRequest) 226 return 227 } 228 229 id, err := s.idResolver.ResolveIdent(r.Context(), user) 230 if err != nil { 231 w.WriteHeader(http.StatusInternalServerError) 232 return 233 } 234 235 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 236 if err != nil { 237 w.WriteHeader(http.StatusNotFound) 238 return 239 } 240 241 if len(pubKeys) == 0 { 242 w.WriteHeader(http.StatusNotFound) 243 return 244 } 245 246 for _, k := range pubKeys { 247 key := strings.TrimRight(k.Key, "\n") 248 w.Write([]byte(fmt.Sprintln(key))) 249 } 250} 251 252// create a signed request and check if a node responds to that 253func (s *State) InitKnotServer(w http.ResponseWriter, r *http.Request) { 254 user := s.oauth.GetUser(r) 255 256 domain := chi.URLParam(r, "domain") 257 if domain == "" { 258 http.Error(w, "malformed url", http.StatusBadRequest) 259 return 260 } 261 log.Println("checking ", domain) 262 263 secret, err := db.GetRegistrationKey(s.db, domain) 264 if err != nil { 265 log.Printf("no key found for domain %s: %s\n", domain, err) 266 return 267 } 268 269 client, err := knotclient.NewSignedClient(domain, secret, s.config.Core.Dev) 270 if err != nil { 271 log.Println("failed to create client to ", domain) 272 } 273 274 resp, err := client.Init(user.Did) 275 if err != nil { 276 w.Write([]byte("no dice")) 277 log.Println("domain was unreachable after 5 seconds") 278 return 279 } 280 281 if resp.StatusCode == http.StatusConflict { 282 log.Println("status conflict", resp.StatusCode) 283 w.Write([]byte("already registered, sorry!")) 284 return 285 } 286 287 if resp.StatusCode != http.StatusNoContent { 288 log.Println("status nok", resp.StatusCode) 289 w.Write([]byte("no dice")) 290 return 291 } 292 293 // verify response mac 294 signature := resp.Header.Get("X-Signature") 295 signatureBytes, err := hex.DecodeString(signature) 296 if err != nil { 297 return 298 } 299 300 expectedMac := hmac.New(sha256.New, []byte(secret)) 301 expectedMac.Write([]byte("ok")) 302 303 if !hmac.Equal(expectedMac.Sum(nil), signatureBytes) { 304 log.Printf("response body signature mismatch: %x\n", signatureBytes) 305 return 306 } 307 308 tx, err := s.db.BeginTx(r.Context(), nil) 309 if err != nil { 310 log.Println("failed to start tx", err) 311 http.Error(w, err.Error(), http.StatusInternalServerError) 312 return 313 } 314 defer func() { 315 tx.Rollback() 316 err = s.enforcer.E.LoadPolicy() 317 if err != nil { 318 log.Println("failed to rollback policies") 319 } 320 }() 321 322 // mark as registered 323 err = db.Register(tx, domain) 324 if err != nil { 325 log.Println("failed to register domain", err) 326 http.Error(w, err.Error(), http.StatusInternalServerError) 327 return 328 } 329 330 // set permissions for this did as owner 331 reg, err := db.RegistrationByDomain(tx, domain) 332 if err != nil { 333 log.Println("failed to register domain", err) 334 http.Error(w, err.Error(), http.StatusInternalServerError) 335 return 336 } 337 338 // add basic acls for this domain 339 err = s.enforcer.AddDomain(domain) 340 if err != nil { 341 log.Println("failed to setup owner of domain", err) 342 http.Error(w, err.Error(), http.StatusInternalServerError) 343 return 344 } 345 346 // add this did as owner of this domain 347 err = s.enforcer.AddOwner(domain, reg.ByDid) 348 if err != nil { 349 log.Println("failed to setup owner of domain", err) 350 http.Error(w, err.Error(), http.StatusInternalServerError) 351 return 352 } 353 354 err = tx.Commit() 355 if err != nil { 356 log.Println("failed to commit changes", err) 357 http.Error(w, err.Error(), http.StatusInternalServerError) 358 return 359 } 360 361 err = s.enforcer.E.SavePolicy() 362 if err != nil { 363 log.Println("failed to update ACLs", err) 364 http.Error(w, err.Error(), http.StatusInternalServerError) 365 return 366 } 367 368 // add this knot to knotstream 369 go s.knotstream.AddSource(context.Background(), knotclient.EventSource{domain}) 370 371 w.Write([]byte("check success")) 372} 373 374func (s *State) KnotServerInfo(w http.ResponseWriter, r *http.Request) { 375 domain := chi.URLParam(r, "domain") 376 if domain == "" { 377 http.Error(w, "malformed url", http.StatusBadRequest) 378 return 379 } 380 381 user := s.oauth.GetUser(r) 382 reg, err := db.RegistrationByDomain(s.db, domain) 383 if err != nil { 384 w.Write([]byte("failed to pull up registration info")) 385 return 386 } 387 388 var members []string 389 if reg.Registered != nil { 390 members, err = s.enforcer.GetUserByRole("server:member", domain) 391 if err != nil { 392 w.Write([]byte("failed to fetch member list")) 393 return 394 } 395 } 396 397 var didsToResolve []string 398 for _, m := range members { 399 didsToResolve = append(didsToResolve, m) 400 } 401 didsToResolve = append(didsToResolve, reg.ByDid) 402 resolvedIds := s.idResolver.ResolveIdents(r.Context(), didsToResolve) 403 didHandleMap := make(map[string]string) 404 for _, identity := range resolvedIds { 405 if !identity.Handle.IsInvalidHandle() { 406 didHandleMap[identity.DID.String()] = fmt.Sprintf("@%s", identity.Handle.String()) 407 } else { 408 didHandleMap[identity.DID.String()] = identity.DID.String() 409 } 410 } 411 412 ok, err := s.enforcer.IsServerOwner(user.Did, domain) 413 isOwner := err == nil && ok 414 415 p := pages.KnotParams{ 416 LoggedInUser: user, 417 DidHandleMap: didHandleMap, 418 Registration: reg, 419 Members: members, 420 IsOwner: isOwner, 421 } 422 423 s.pages.Knot(w, p) 424} 425 426// get knots registered by this user 427func (s *State) Knots(w http.ResponseWriter, r *http.Request) { 428 // for now, this is just pubkeys 429 user := s.oauth.GetUser(r) 430 registrations, err := db.RegistrationsByDid(s.db, user.Did) 431 if err != nil { 432 log.Println(err) 433 } 434 435 s.pages.Knots(w, pages.KnotsParams{ 436 LoggedInUser: user, 437 Registrations: registrations, 438 }) 439} 440 441// list members of domain, requires auth and requires owner status 442func (s *State) ListMembers(w http.ResponseWriter, r *http.Request) { 443 domain := chi.URLParam(r, "domain") 444 if domain == "" { 445 http.Error(w, "malformed url", http.StatusBadRequest) 446 return 447 } 448 449 // list all members for this domain 450 memberDids, err := s.enforcer.GetUserByRole("server:member", domain) 451 if err != nil { 452 w.Write([]byte("failed to fetch member list")) 453 return 454 } 455 456 w.Write([]byte(strings.Join(memberDids, "\n"))) 457 return 458} 459 460// add member to domain, requires auth and requires invite access 461func (s *State) AddMember(w http.ResponseWriter, r *http.Request) { 462 domain := chi.URLParam(r, "domain") 463 if domain == "" { 464 http.Error(w, "malformed url", http.StatusBadRequest) 465 return 466 } 467 468 subjectIdentifier := r.FormValue("subject") 469 if subjectIdentifier == "" { 470 http.Error(w, "malformed form", http.StatusBadRequest) 471 return 472 } 473 474 subjectIdentity, err := s.idResolver.ResolveIdent(r.Context(), subjectIdentifier) 475 if err != nil { 476 w.Write([]byte("failed to resolve member did to a handle")) 477 return 478 } 479 log.Printf("adding %s to %s\n", subjectIdentity.Handle.String(), domain) 480 481 // announce this relation into the firehose, store into owners' pds 482 client, err := s.oauth.AuthorizedClient(r) 483 if err != nil { 484 http.Error(w, "failed to authorize client", http.StatusInternalServerError) 485 return 486 } 487 currentUser := s.oauth.GetUser(r) 488 createdAt := time.Now().Format(time.RFC3339) 489 resp, err := client.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{ 490 Collection: tangled.KnotMemberNSID, 491 Repo: currentUser.Did, 492 Rkey: appview.TID(), 493 Record: &lexutil.LexiconTypeDecoder{ 494 Val: &tangled.KnotMember{ 495 Subject: subjectIdentity.DID.String(), 496 Domain: domain, 497 CreatedAt: createdAt, 498 }}, 499 }) 500 501 // invalid record 502 if err != nil { 503 log.Printf("failed to create record: %s", err) 504 return 505 } 506 log.Println("created atproto record: ", resp.Uri) 507 508 secret, err := db.GetRegistrationKey(s.db, domain) 509 if err != nil { 510 log.Printf("no key found for domain %s: %s\n", domain, err) 511 return 512 } 513 514 ksClient, err := knotclient.NewSignedClient(domain, secret, s.config.Core.Dev) 515 if err != nil { 516 log.Println("failed to create client to ", domain) 517 return 518 } 519 520 ksResp, err := ksClient.AddMember(subjectIdentity.DID.String()) 521 if err != nil { 522 log.Printf("failed to make request to %s: %s", domain, err) 523 return 524 } 525 526 if ksResp.StatusCode != http.StatusNoContent { 527 w.Write([]byte(fmt.Sprint("knotserver failed to add member: ", err))) 528 return 529 } 530 531 err = s.enforcer.AddMember(domain, subjectIdentity.DID.String()) 532 if err != nil { 533 w.Write([]byte(fmt.Sprint("failed to add member: ", err))) 534 return 535 } 536 537 w.Write([]byte(fmt.Sprint("added member: ", subjectIdentity.Handle.String()))) 538} 539 540func (s *State) RemoveMember(w http.ResponseWriter, r *http.Request) { 541} 542 543func validateRepoName(name string) error { 544 // check for path traversal attempts 545 if name == "." || name == ".." || 546 strings.Contains(name, "/") || strings.Contains(name, "\\") { 547 return fmt.Errorf("Repository name contains invalid path characters") 548 } 549 550 // check for sequences that could be used for traversal when normalized 551 if strings.Contains(name, "./") || strings.Contains(name, "../") || 552 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 553 return fmt.Errorf("Repository name contains invalid path sequence") 554 } 555 556 // then continue with character validation 557 for _, char := range name { 558 if !((char >= 'a' && char <= 'z') || 559 (char >= 'A' && char <= 'Z') || 560 (char >= '0' && char <= '9') || 561 char == '-' || char == '_' || char == '.') { 562 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 563 } 564 } 565 566 // additional check to prevent multiple sequential dots 567 if strings.Contains(name, "..") { 568 return fmt.Errorf("Repository name cannot contain sequential dots") 569 } 570 571 // if all checks pass 572 return nil 573} 574 575func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 576 switch r.Method { 577 case http.MethodGet: 578 user := s.oauth.GetUser(r) 579 knots, err := s.enforcer.GetDomainsForUser(user.Did) 580 if err != nil { 581 s.pages.Notice(w, "repo", "Invalid user account.") 582 return 583 } 584 585 s.pages.NewRepo(w, pages.NewRepoParams{ 586 LoggedInUser: user, 587 Knots: knots, 588 }) 589 590 case http.MethodPost: 591 user := s.oauth.GetUser(r) 592 593 domain := r.FormValue("domain") 594 if domain == "" { 595 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 596 return 597 } 598 599 repoName := r.FormValue("name") 600 if repoName == "" { 601 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 602 return 603 } 604 605 if err := validateRepoName(repoName); err != nil { 606 s.pages.Notice(w, "repo", err.Error()) 607 return 608 } 609 610 defaultBranch := r.FormValue("branch") 611 if defaultBranch == "" { 612 defaultBranch = "main" 613 } 614 615 description := r.FormValue("description") 616 617 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 618 if err != nil || !ok { 619 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 620 return 621 } 622 623 existingRepo, err := db.GetRepo(s.db, user.Did, repoName) 624 if err == nil && existingRepo != nil { 625 s.pages.Notice(w, "repo", fmt.Sprintf("A repo by this name already exists on %s", existingRepo.Knot)) 626 return 627 } 628 629 secret, err := db.GetRegistrationKey(s.db, domain) 630 if err != nil { 631 s.pages.Notice(w, "repo", fmt.Sprintf("No registration key found for knot %s.", domain)) 632 return 633 } 634 635 client, err := knotclient.NewSignedClient(domain, secret, s.config.Core.Dev) 636 if err != nil { 637 s.pages.Notice(w, "repo", "Failed to connect to knot server.") 638 return 639 } 640 641 rkey := appview.TID() 642 repo := &db.Repo{ 643 Did: user.Did, 644 Name: repoName, 645 Knot: domain, 646 Rkey: rkey, 647 Description: description, 648 } 649 650 xrpcClient, err := s.oauth.AuthorizedClient(r) 651 if err != nil { 652 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 653 return 654 } 655 656 createdAt := time.Now().Format(time.RFC3339) 657 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{ 658 Collection: tangled.RepoNSID, 659 Repo: user.Did, 660 Rkey: rkey, 661 Record: &lexutil.LexiconTypeDecoder{ 662 Val: &tangled.Repo{ 663 Knot: repo.Knot, 664 Name: repoName, 665 CreatedAt: createdAt, 666 Owner: user.Did, 667 }}, 668 }) 669 if err != nil { 670 log.Printf("failed to create record: %s", err) 671 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 672 return 673 } 674 log.Println("created repo record: ", atresp.Uri) 675 676 tx, err := s.db.BeginTx(r.Context(), nil) 677 if err != nil { 678 log.Println(err) 679 s.pages.Notice(w, "repo", "Failed to save repository information.") 680 return 681 } 682 defer func() { 683 tx.Rollback() 684 err = s.enforcer.E.LoadPolicy() 685 if err != nil { 686 log.Println("failed to rollback policies") 687 } 688 }() 689 690 resp, err := client.NewRepo(user.Did, repoName, defaultBranch) 691 if err != nil { 692 s.pages.Notice(w, "repo", "Failed to create repository on knot server.") 693 return 694 } 695 696 switch resp.StatusCode { 697 case http.StatusConflict: 698 s.pages.Notice(w, "repo", "A repository with that name already exists.") 699 return 700 case http.StatusInternalServerError: 701 s.pages.Notice(w, "repo", "Failed to create repository on knot. Try again later.") 702 case http.StatusNoContent: 703 // continue 704 } 705 706 repo.AtUri = atresp.Uri 707 err = db.AddRepo(tx, repo) 708 if err != nil { 709 log.Println(err) 710 s.pages.Notice(w, "repo", "Failed to save repository information.") 711 return 712 } 713 714 // acls 715 p, _ := securejoin.SecureJoin(user.Did, repoName) 716 err = s.enforcer.AddRepo(user.Did, domain, p) 717 if err != nil { 718 log.Println(err) 719 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 720 return 721 } 722 723 err = tx.Commit() 724 if err != nil { 725 log.Println("failed to commit changes", err) 726 http.Error(w, err.Error(), http.StatusInternalServerError) 727 return 728 } 729 730 err = s.enforcer.E.SavePolicy() 731 if err != nil { 732 log.Println("failed to update ACLs", err) 733 http.Error(w, err.Error(), http.StatusInternalServerError) 734 return 735 } 736 737 if !s.config.Core.Dev { 738 err = s.posthog.Enqueue(posthog.Capture{ 739 DistinctId: user.Did, 740 Event: "new_repo", 741 Properties: posthog.Properties{"repo": repoName, "repo_at": repo.AtUri}, 742 }) 743 if err != nil { 744 log.Println("failed to enqueue posthog event:", err) 745 } 746 } 747 748 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName)) 749 return 750 } 751}