forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state 2 3import ( 4 "context" 5 "crypto/hmac" 6 "crypto/sha256" 7 "encoding/hex" 8 "fmt" 9 "log" 10 "log/slog" 11 "net/http" 12 "strings" 13 "time" 14 15 comatproto "github.com/bluesky-social/indigo/api/atproto" 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 lexutil "github.com/bluesky-social/indigo/lex/util" 18 securejoin "github.com/cyphar/filepath-securejoin" 19 "github.com/go-chi/chi/v5" 20 "github.com/posthog/posthog-go" 21 "tangled.sh/tangled.sh/core/api/tangled" 22 "tangled.sh/tangled.sh/core/appview" 23 "tangled.sh/tangled.sh/core/appview/cache" 24 "tangled.sh/tangled.sh/core/appview/cache/session" 25 "tangled.sh/tangled.sh/core/appview/config" 26 "tangled.sh/tangled.sh/core/appview/db" 27 "tangled.sh/tangled.sh/core/appview/idresolver" 28 "tangled.sh/tangled.sh/core/appview/oauth" 29 "tangled.sh/tangled.sh/core/appview/pages" 30 "tangled.sh/tangled.sh/core/appview/reporesolver" 31 "tangled.sh/tangled.sh/core/eventconsumer" 32 "tangled.sh/tangled.sh/core/jetstream" 33 "tangled.sh/tangled.sh/core/knotclient" 34 "tangled.sh/tangled.sh/core/rbac" 35) 36 37type State struct { 38 db *db.DB 39 oauth *oauth.OAuth 40 enforcer *rbac.Enforcer 41 tidClock syntax.TIDClock 42 pages *pages.Pages 43 sess *session.SessionStore 44 idResolver *idresolver.Resolver 45 posthog posthog.Client 46 jc *jetstream.JetstreamClient 47 config *config.Config 48 repoResolver *reporesolver.RepoResolver 49 knotstream *eventconsumer.Consumer 50 spindlestream *eventconsumer.Consumer 51} 52 53func Make(ctx context.Context, config *config.Config) (*State, error) { 54 d, err := db.Make(config.Core.DbPath) 55 if err != nil { 56 return nil, err 57 } 58 59 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 60 if err != nil { 61 return nil, err 62 } 63 64 clock := syntax.NewTIDClock(0) 65 66 pgs := pages.NewPages(config) 67 68 res, err := idresolver.RedisResolver(config.Redis) 69 if err != nil { 70 log.Printf("failed to create redis resolver: %v", err) 71 res = idresolver.DefaultResolver() 72 } 73 74 cache := cache.New(config.Redis.Addr) 75 sess := session.New(cache) 76 77 oauth := oauth.NewOAuth(config, sess) 78 79 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 80 if err != nil { 81 return nil, fmt.Errorf("failed to create posthog client: %w", err) 82 } 83 84 repoResolver := reporesolver.New(config, enforcer, res, d) 85 86 wrapper := db.DbWrapper{d} 87 jc, err := jetstream.NewJetstreamClient( 88 config.Jetstream.Endpoint, 89 "appview", 90 []string{ 91 tangled.GraphFollowNSID, 92 tangled.FeedStarNSID, 93 tangled.PublicKeyNSID, 94 tangled.RepoArtifactNSID, 95 tangled.ActorProfileNSID, 96 }, 97 nil, 98 slog.Default(), 99 wrapper, 100 false, 101 102 // in-memory filter is inapplicalble to appview so 103 // we'll never log dids anyway. 104 false, 105 ) 106 if err != nil { 107 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 108 } 109 err = jc.StartJetstream(ctx, appview.Ingest(wrapper, enforcer)) 110 if err != nil { 111 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 112 } 113 114 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 115 if err != nil { 116 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 117 } 118 knotstream.Start(ctx) 119 120 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 121 if err != nil { 122 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 123 } 124 spindlestream.Start(ctx) 125 126 state := &State{ 127 d, 128 oauth, 129 enforcer, 130 clock, 131 pgs, 132 sess, 133 res, 134 posthog, 135 jc, 136 config, 137 repoResolver, 138 knotstream, 139 spindlestream, 140 } 141 142 return state, nil 143} 144 145func TID(c *syntax.TIDClock) string { 146 return c.Next().String() 147} 148 149func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 150 user := s.oauth.GetUser(r) 151 152 timeline, err := db.MakeTimeline(s.db) 153 if err != nil { 154 log.Println(err) 155 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 156 } 157 158 var didsToResolve []string 159 for _, ev := range timeline { 160 if ev.Repo != nil { 161 didsToResolve = append(didsToResolve, ev.Repo.Did) 162 if ev.Source != nil { 163 didsToResolve = append(didsToResolve, ev.Source.Did) 164 } 165 } 166 if ev.Follow != nil { 167 didsToResolve = append(didsToResolve, ev.Follow.UserDid, ev.Follow.SubjectDid) 168 } 169 if ev.Star != nil { 170 didsToResolve = append(didsToResolve, ev.Star.StarredByDid, ev.Star.Repo.Did) 171 } 172 } 173 174 resolvedIds := s.idResolver.ResolveIdents(r.Context(), didsToResolve) 175 didHandleMap := make(map[string]string) 176 for _, identity := range resolvedIds { 177 if !identity.Handle.IsInvalidHandle() { 178 didHandleMap[identity.DID.String()] = fmt.Sprintf("@%s", identity.Handle.String()) 179 } else { 180 didHandleMap[identity.DID.String()] = identity.DID.String() 181 } 182 } 183 184 s.pages.Timeline(w, pages.TimelineParams{ 185 LoggedInUser: user, 186 Timeline: timeline, 187 DidHandleMap: didHandleMap, 188 }) 189 190 return 191} 192 193// requires auth 194func (s *State) RegistrationKey(w http.ResponseWriter, r *http.Request) { 195 switch r.Method { 196 case http.MethodGet: 197 // list open registrations under this did 198 199 return 200 case http.MethodPost: 201 session, err := s.oauth.Stores().Get(r, oauth.SessionName) 202 if err != nil || session.IsNew { 203 log.Println("unauthorized attempt to generate registration key") 204 http.Error(w, "Forbidden", http.StatusUnauthorized) 205 return 206 } 207 208 did := session.Values[oauth.SessionDid].(string) 209 210 // check if domain is valid url, and strip extra bits down to just host 211 domain := r.FormValue("domain") 212 if domain == "" { 213 http.Error(w, "Invalid form", http.StatusBadRequest) 214 return 215 } 216 217 key, err := db.GenerateRegistrationKey(s.db, domain, did) 218 219 if err != nil { 220 log.Println(err) 221 http.Error(w, "unable to register this domain", http.StatusNotAcceptable) 222 return 223 } 224 225 w.Write([]byte(key)) 226 } 227} 228 229func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 230 user := chi.URLParam(r, "user") 231 user = strings.TrimPrefix(user, "@") 232 233 if user == "" { 234 w.WriteHeader(http.StatusBadRequest) 235 return 236 } 237 238 id, err := s.idResolver.ResolveIdent(r.Context(), user) 239 if err != nil { 240 w.WriteHeader(http.StatusInternalServerError) 241 return 242 } 243 244 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 245 if err != nil { 246 w.WriteHeader(http.StatusNotFound) 247 return 248 } 249 250 if len(pubKeys) == 0 { 251 w.WriteHeader(http.StatusNotFound) 252 return 253 } 254 255 for _, k := range pubKeys { 256 key := strings.TrimRight(k.Key, "\n") 257 w.Write([]byte(fmt.Sprintln(key))) 258 } 259} 260 261// create a signed request and check if a node responds to that 262func (s *State) InitKnotServer(w http.ResponseWriter, r *http.Request) { 263 user := s.oauth.GetUser(r) 264 265 domain := chi.URLParam(r, "domain") 266 if domain == "" { 267 http.Error(w, "malformed url", http.StatusBadRequest) 268 return 269 } 270 log.Println("checking ", domain) 271 272 secret, err := db.GetRegistrationKey(s.db, domain) 273 if err != nil { 274 log.Printf("no key found for domain %s: %s\n", domain, err) 275 return 276 } 277 278 client, err := knotclient.NewSignedClient(domain, secret, s.config.Core.Dev) 279 if err != nil { 280 log.Println("failed to create client to ", domain) 281 } 282 283 resp, err := client.Init(user.Did) 284 if err != nil { 285 w.Write([]byte("no dice")) 286 log.Println("domain was unreachable after 5 seconds") 287 return 288 } 289 290 if resp.StatusCode == http.StatusConflict { 291 log.Println("status conflict", resp.StatusCode) 292 w.Write([]byte("already registered, sorry!")) 293 return 294 } 295 296 if resp.StatusCode != http.StatusNoContent { 297 log.Println("status nok", resp.StatusCode) 298 w.Write([]byte("no dice")) 299 return 300 } 301 302 // verify response mac 303 signature := resp.Header.Get("X-Signature") 304 signatureBytes, err := hex.DecodeString(signature) 305 if err != nil { 306 return 307 } 308 309 expectedMac := hmac.New(sha256.New, []byte(secret)) 310 expectedMac.Write([]byte("ok")) 311 312 if !hmac.Equal(expectedMac.Sum(nil), signatureBytes) { 313 log.Printf("response body signature mismatch: %x\n", signatureBytes) 314 return 315 } 316 317 tx, err := s.db.BeginTx(r.Context(), nil) 318 if err != nil { 319 log.Println("failed to start tx", err) 320 http.Error(w, err.Error(), http.StatusInternalServerError) 321 return 322 } 323 defer func() { 324 tx.Rollback() 325 err = s.enforcer.E.LoadPolicy() 326 if err != nil { 327 log.Println("failed to rollback policies") 328 } 329 }() 330 331 // mark as registered 332 err = db.Register(tx, domain) 333 if err != nil { 334 log.Println("failed to register domain", err) 335 http.Error(w, err.Error(), http.StatusInternalServerError) 336 return 337 } 338 339 // set permissions for this did as owner 340 reg, err := db.RegistrationByDomain(tx, domain) 341 if err != nil { 342 log.Println("failed to register domain", err) 343 http.Error(w, err.Error(), http.StatusInternalServerError) 344 return 345 } 346 347 // add basic acls for this domain 348 err = s.enforcer.AddKnot(domain) 349 if err != nil { 350 log.Println("failed to setup owner of domain", err) 351 http.Error(w, err.Error(), http.StatusInternalServerError) 352 return 353 } 354 355 // add this did as owner of this domain 356 err = s.enforcer.AddKnotOwner(domain, reg.ByDid) 357 if err != nil { 358 log.Println("failed to setup owner of domain", err) 359 http.Error(w, err.Error(), http.StatusInternalServerError) 360 return 361 } 362 363 err = tx.Commit() 364 if err != nil { 365 log.Println("failed to commit changes", err) 366 http.Error(w, err.Error(), http.StatusInternalServerError) 367 return 368 } 369 370 err = s.enforcer.E.SavePolicy() 371 if err != nil { 372 log.Println("failed to update ACLs", err) 373 http.Error(w, err.Error(), http.StatusInternalServerError) 374 return 375 } 376 377 // add this knot to knotstream 378 go s.knotstream.AddSource( 379 context.Background(), 380 eventconsumer.NewKnotSource(domain), 381 ) 382 383 w.Write([]byte("check success")) 384} 385 386func (s *State) KnotServerInfo(w http.ResponseWriter, r *http.Request) { 387 domain := chi.URLParam(r, "domain") 388 if domain == "" { 389 http.Error(w, "malformed url", http.StatusBadRequest) 390 return 391 } 392 393 user := s.oauth.GetUser(r) 394 reg, err := db.RegistrationByDomain(s.db, domain) 395 if err != nil { 396 w.Write([]byte("failed to pull up registration info")) 397 return 398 } 399 400 var members []string 401 if reg.Registered != nil { 402 members, err = s.enforcer.GetUserByRole("server:member", domain) 403 if err != nil { 404 w.Write([]byte("failed to fetch member list")) 405 return 406 } 407 } 408 409 var didsToResolve []string 410 for _, m := range members { 411 didsToResolve = append(didsToResolve, m) 412 } 413 didsToResolve = append(didsToResolve, reg.ByDid) 414 resolvedIds := s.idResolver.ResolveIdents(r.Context(), didsToResolve) 415 didHandleMap := make(map[string]string) 416 for _, identity := range resolvedIds { 417 if !identity.Handle.IsInvalidHandle() { 418 didHandleMap[identity.DID.String()] = fmt.Sprintf("@%s", identity.Handle.String()) 419 } else { 420 didHandleMap[identity.DID.String()] = identity.DID.String() 421 } 422 } 423 424 ok, err := s.enforcer.IsKnotOwner(user.Did, domain) 425 isOwner := err == nil && ok 426 427 p := pages.KnotParams{ 428 LoggedInUser: user, 429 DidHandleMap: didHandleMap, 430 Registration: reg, 431 Members: members, 432 IsOwner: isOwner, 433 } 434 435 s.pages.Knot(w, p) 436} 437 438// get knots registered by this user 439func (s *State) Knots(w http.ResponseWriter, r *http.Request) { 440 // for now, this is just pubkeys 441 user := s.oauth.GetUser(r) 442 registrations, err := db.RegistrationsByDid(s.db, user.Did) 443 if err != nil { 444 log.Println(err) 445 } 446 447 s.pages.Knots(w, pages.KnotsParams{ 448 LoggedInUser: user, 449 Registrations: registrations, 450 }) 451} 452 453// list members of domain, requires auth and requires owner status 454func (s *State) ListMembers(w http.ResponseWriter, r *http.Request) { 455 domain := chi.URLParam(r, "domain") 456 if domain == "" { 457 http.Error(w, "malformed url", http.StatusBadRequest) 458 return 459 } 460 461 // list all members for this domain 462 memberDids, err := s.enforcer.GetUserByRole("server:member", domain) 463 if err != nil { 464 w.Write([]byte("failed to fetch member list")) 465 return 466 } 467 468 w.Write([]byte(strings.Join(memberDids, "\n"))) 469 return 470} 471 472// add member to domain, requires auth and requires invite access 473func (s *State) AddMember(w http.ResponseWriter, r *http.Request) { 474 domain := chi.URLParam(r, "domain") 475 if domain == "" { 476 http.Error(w, "malformed url", http.StatusBadRequest) 477 return 478 } 479 480 subjectIdentifier := r.FormValue("subject") 481 if subjectIdentifier == "" { 482 http.Error(w, "malformed form", http.StatusBadRequest) 483 return 484 } 485 486 subjectIdentity, err := s.idResolver.ResolveIdent(r.Context(), subjectIdentifier) 487 if err != nil { 488 w.Write([]byte("failed to resolve member did to a handle")) 489 return 490 } 491 log.Printf("adding %s to %s\n", subjectIdentity.Handle.String(), domain) 492 493 // announce this relation into the firehose, store into owners' pds 494 client, err := s.oauth.AuthorizedClient(r) 495 if err != nil { 496 http.Error(w, "failed to authorize client", http.StatusInternalServerError) 497 return 498 } 499 currentUser := s.oauth.GetUser(r) 500 createdAt := time.Now().Format(time.RFC3339) 501 resp, err := client.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{ 502 Collection: tangled.KnotMemberNSID, 503 Repo: currentUser.Did, 504 Rkey: appview.TID(), 505 Record: &lexutil.LexiconTypeDecoder{ 506 Val: &tangled.KnotMember{ 507 Subject: subjectIdentity.DID.String(), 508 Domain: domain, 509 CreatedAt: createdAt, 510 }}, 511 }) 512 513 // invalid record 514 if err != nil { 515 log.Printf("failed to create record: %s", err) 516 return 517 } 518 log.Println("created atproto record: ", resp.Uri) 519 520 secret, err := db.GetRegistrationKey(s.db, domain) 521 if err != nil { 522 log.Printf("no key found for domain %s: %s\n", domain, err) 523 return 524 } 525 526 ksClient, err := knotclient.NewSignedClient(domain, secret, s.config.Core.Dev) 527 if err != nil { 528 log.Println("failed to create client to ", domain) 529 return 530 } 531 532 ksResp, err := ksClient.AddMember(subjectIdentity.DID.String()) 533 if err != nil { 534 log.Printf("failed to make request to %s: %s", domain, err) 535 return 536 } 537 538 if ksResp.StatusCode != http.StatusNoContent { 539 w.Write([]byte(fmt.Sprint("knotserver failed to add member: ", err))) 540 return 541 } 542 543 err = s.enforcer.AddKnotMember(domain, subjectIdentity.DID.String()) 544 if err != nil { 545 w.Write([]byte(fmt.Sprint("failed to add member: ", err))) 546 return 547 } 548 549 w.Write([]byte(fmt.Sprint("added member: ", subjectIdentity.Handle.String()))) 550} 551 552func (s *State) RemoveMember(w http.ResponseWriter, r *http.Request) { 553} 554 555func validateRepoName(name string) error { 556 // check for path traversal attempts 557 if name == "." || name == ".." || 558 strings.Contains(name, "/") || strings.Contains(name, "\\") { 559 return fmt.Errorf("Repository name contains invalid path characters") 560 } 561 562 // check for sequences that could be used for traversal when normalized 563 if strings.Contains(name, "./") || strings.Contains(name, "../") || 564 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 565 return fmt.Errorf("Repository name contains invalid path sequence") 566 } 567 568 // then continue with character validation 569 for _, char := range name { 570 if !((char >= 'a' && char <= 'z') || 571 (char >= 'A' && char <= 'Z') || 572 (char >= '0' && char <= '9') || 573 char == '-' || char == '_' || char == '.') { 574 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 575 } 576 } 577 578 // additional check to prevent multiple sequential dots 579 if strings.Contains(name, "..") { 580 return fmt.Errorf("Repository name cannot contain sequential dots") 581 } 582 583 // if all checks pass 584 return nil 585} 586 587func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 588 switch r.Method { 589 case http.MethodGet: 590 user := s.oauth.GetUser(r) 591 knots, err := s.enforcer.GetKnotsForUser(user.Did) 592 if err != nil { 593 s.pages.Notice(w, "repo", "Invalid user account.") 594 return 595 } 596 597 s.pages.NewRepo(w, pages.NewRepoParams{ 598 LoggedInUser: user, 599 Knots: knots, 600 }) 601 602 case http.MethodPost: 603 user := s.oauth.GetUser(r) 604 605 domain := r.FormValue("domain") 606 if domain == "" { 607 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 608 return 609 } 610 611 repoName := r.FormValue("name") 612 if repoName == "" { 613 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 614 return 615 } 616 617 if err := validateRepoName(repoName); err != nil { 618 s.pages.Notice(w, "repo", err.Error()) 619 return 620 } 621 622 defaultBranch := r.FormValue("branch") 623 if defaultBranch == "" { 624 defaultBranch = "main" 625 } 626 627 description := r.FormValue("description") 628 629 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 630 if err != nil || !ok { 631 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 632 return 633 } 634 635 existingRepo, err := db.GetRepo(s.db, user.Did, repoName) 636 if err == nil && existingRepo != nil { 637 s.pages.Notice(w, "repo", fmt.Sprintf("A repo by this name already exists on %s", existingRepo.Knot)) 638 return 639 } 640 641 secret, err := db.GetRegistrationKey(s.db, domain) 642 if err != nil { 643 s.pages.Notice(w, "repo", fmt.Sprintf("No registration key found for knot %s.", domain)) 644 return 645 } 646 647 client, err := knotclient.NewSignedClient(domain, secret, s.config.Core.Dev) 648 if err != nil { 649 s.pages.Notice(w, "repo", "Failed to connect to knot server.") 650 return 651 } 652 653 rkey := appview.TID() 654 repo := &db.Repo{ 655 Did: user.Did, 656 Name: repoName, 657 Knot: domain, 658 Rkey: rkey, 659 Description: description, 660 } 661 662 xrpcClient, err := s.oauth.AuthorizedClient(r) 663 if err != nil { 664 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 665 return 666 } 667 668 createdAt := time.Now().Format(time.RFC3339) 669 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{ 670 Collection: tangled.RepoNSID, 671 Repo: user.Did, 672 Rkey: rkey, 673 Record: &lexutil.LexiconTypeDecoder{ 674 Val: &tangled.Repo{ 675 Knot: repo.Knot, 676 Name: repoName, 677 CreatedAt: createdAt, 678 Owner: user.Did, 679 }}, 680 }) 681 if err != nil { 682 log.Printf("failed to create record: %s", err) 683 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 684 return 685 } 686 log.Println("created repo record: ", atresp.Uri) 687 688 tx, err := s.db.BeginTx(r.Context(), nil) 689 if err != nil { 690 log.Println(err) 691 s.pages.Notice(w, "repo", "Failed to save repository information.") 692 return 693 } 694 defer func() { 695 tx.Rollback() 696 err = s.enforcer.E.LoadPolicy() 697 if err != nil { 698 log.Println("failed to rollback policies") 699 } 700 }() 701 702 resp, err := client.NewRepo(user.Did, repoName, defaultBranch) 703 if err != nil { 704 s.pages.Notice(w, "repo", "Failed to create repository on knot server.") 705 return 706 } 707 708 switch resp.StatusCode { 709 case http.StatusConflict: 710 s.pages.Notice(w, "repo", "A repository with that name already exists.") 711 return 712 case http.StatusInternalServerError: 713 s.pages.Notice(w, "repo", "Failed to create repository on knot. Try again later.") 714 case http.StatusNoContent: 715 // continue 716 } 717 718 repo.AtUri = atresp.Uri 719 err = db.AddRepo(tx, repo) 720 if err != nil { 721 log.Println(err) 722 s.pages.Notice(w, "repo", "Failed to save repository information.") 723 return 724 } 725 726 // acls 727 p, _ := securejoin.SecureJoin(user.Did, repoName) 728 err = s.enforcer.AddRepo(user.Did, domain, p) 729 if err != nil { 730 log.Println(err) 731 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 732 return 733 } 734 735 err = tx.Commit() 736 if err != nil { 737 log.Println("failed to commit changes", err) 738 http.Error(w, err.Error(), http.StatusInternalServerError) 739 return 740 } 741 742 err = s.enforcer.E.SavePolicy() 743 if err != nil { 744 log.Println("failed to update ACLs", err) 745 http.Error(w, err.Error(), http.StatusInternalServerError) 746 return 747 } 748 749 if !s.config.Core.Dev { 750 err = s.posthog.Enqueue(posthog.Capture{ 751 DistinctId: user.Did, 752 Event: "new_repo", 753 Properties: posthog.Properties{"repo": repoName, "repo_at": repo.AtUri}, 754 }) 755 if err != nil { 756 log.Println("failed to enqueue posthog event:", err) 757 } 758 } 759 760 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName)) 761 return 762 } 763}