forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package repo 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "net/url" 11 "slices" 12 "strings" 13 "time" 14 15 "tangled.org/core/api/tangled" 16 "tangled.org/core/appview/config" 17 "tangled.org/core/appview/db" 18 "tangled.org/core/appview/models" 19 "tangled.org/core/appview/notify" 20 "tangled.org/core/appview/oauth" 21 "tangled.org/core/appview/pages" 22 "tangled.org/core/appview/reporesolver" 23 "tangled.org/core/appview/validator" 24 xrpcclient "tangled.org/core/appview/xrpcclient" 25 "tangled.org/core/eventconsumer" 26 "tangled.org/core/idresolver" 27 "tangled.org/core/rbac" 28 "tangled.org/core/tid" 29 "tangled.org/core/xrpc/serviceauth" 30 31 comatproto "github.com/bluesky-social/indigo/api/atproto" 32 atpclient "github.com/bluesky-social/indigo/atproto/client" 33 "github.com/bluesky-social/indigo/atproto/syntax" 34 lexutil "github.com/bluesky-social/indigo/lex/util" 35 securejoin "github.com/cyphar/filepath-securejoin" 36 "github.com/go-chi/chi/v5" 37) 38 39type Repo struct { 40 repoResolver *reporesolver.RepoResolver 41 idResolver *idresolver.Resolver 42 config *config.Config 43 oauth *oauth.OAuth 44 pages *pages.Pages 45 spindlestream *eventconsumer.Consumer 46 db *db.DB 47 enforcer *rbac.Enforcer 48 notifier notify.Notifier 49 logger *slog.Logger 50 serviceAuth *serviceauth.ServiceAuth 51 validator *validator.Validator 52} 53 54func New( 55 oauth *oauth.OAuth, 56 repoResolver *reporesolver.RepoResolver, 57 pages *pages.Pages, 58 spindlestream *eventconsumer.Consumer, 59 idResolver *idresolver.Resolver, 60 db *db.DB, 61 config *config.Config, 62 notifier notify.Notifier, 63 enforcer *rbac.Enforcer, 64 logger *slog.Logger, 65 validator *validator.Validator, 66) *Repo { 67 return &Repo{oauth: oauth, 68 repoResolver: repoResolver, 69 pages: pages, 70 idResolver: idResolver, 71 config: config, 72 spindlestream: spindlestream, 73 db: db, 74 notifier: notifier, 75 enforcer: enforcer, 76 logger: logger, 77 validator: validator, 78 } 79} 80 81// isTextualMimeType returns true if the MIME type represents textual content 82 83// modify the spindle configured for this repo 84func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) { 85 user := rp.oauth.GetUser(r) 86 l := rp.logger.With("handler", "EditSpindle") 87 l = l.With("did", user.Did) 88 89 errorId := "operation-error" 90 fail := func(msg string, err error) { 91 l.Error(msg, "err", err) 92 rp.pages.Notice(w, errorId, msg) 93 } 94 95 f, err := rp.repoResolver.Resolve(r) 96 if err != nil { 97 fail("Failed to resolve repo. Try again later", err) 98 return 99 } 100 101 newSpindle := r.FormValue("spindle") 102 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value 103 client, err := rp.oauth.AuthorizedClient(r) 104 if err != nil { 105 fail("Failed to authorize. Try again later.", err) 106 return 107 } 108 109 if !removingSpindle { 110 // ensure that this is a valid spindle for this user 111 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Did) 112 if err != nil { 113 fail("Failed to find spindles. Try again later.", err) 114 return 115 } 116 117 if !slices.Contains(validSpindles, newSpindle) { 118 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles)) 119 return 120 } 121 } 122 123 newRepo := f.Repo 124 newRepo.Spindle = newSpindle 125 record := newRepo.AsRecord() 126 127 spindlePtr := &newSpindle 128 if removingSpindle { 129 spindlePtr = nil 130 newRepo.Spindle = "" 131 } 132 133 // optimistic update 134 err = db.UpdateSpindle(rp.db, newRepo.RepoAt().String(), spindlePtr) 135 if err != nil { 136 fail("Failed to update spindle. Try again later.", err) 137 return 138 } 139 140 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 141 if err != nil { 142 fail("Failed to update spindle, no record found on PDS.", err) 143 return 144 } 145 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 146 Collection: tangled.RepoNSID, 147 Repo: newRepo.Did, 148 Rkey: newRepo.Rkey, 149 SwapRecord: ex.Cid, 150 Record: &lexutil.LexiconTypeDecoder{ 151 Val: &record, 152 }, 153 }) 154 155 if err != nil { 156 fail("Failed to update spindle, unable to save to PDS.", err) 157 return 158 } 159 160 if !removingSpindle { 161 // add this spindle to spindle stream 162 rp.spindlestream.AddSource( 163 context.Background(), 164 eventconsumer.NewSpindleSource(newSpindle), 165 ) 166 } 167 168 rp.pages.HxRefresh(w) 169} 170 171func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) { 172 user := rp.oauth.GetUser(r) 173 l := rp.logger.With("handler", "AddLabel") 174 l = l.With("did", user.Did) 175 176 f, err := rp.repoResolver.Resolve(r) 177 if err != nil { 178 l.Error("failed to get repo and knot", "err", err) 179 return 180 } 181 182 errorId := "add-label-error" 183 fail := func(msg string, err error) { 184 l.Error(msg, "err", err) 185 rp.pages.Notice(w, errorId, msg) 186 } 187 188 // get form values for label definition 189 name := r.FormValue("name") 190 concreteType := r.FormValue("valueType") 191 valueFormat := r.FormValue("valueFormat") 192 enumValues := r.FormValue("enumValues") 193 scope := r.Form["scope"] 194 color := r.FormValue("color") 195 multiple := r.FormValue("multiple") == "true" 196 197 var variants []string 198 for part := range strings.SplitSeq(enumValues, ",") { 199 if part = strings.TrimSpace(part); part != "" { 200 variants = append(variants, part) 201 } 202 } 203 204 if concreteType == "" { 205 concreteType = "null" 206 } 207 208 format := models.ValueTypeFormatAny 209 if valueFormat == "did" { 210 format = models.ValueTypeFormatDid 211 } 212 213 valueType := models.ValueType{ 214 Type: models.ConcreteType(concreteType), 215 Format: format, 216 Enum: variants, 217 } 218 219 label := models.LabelDefinition{ 220 Did: user.Did, 221 Rkey: tid.TID(), 222 Name: name, 223 ValueType: valueType, 224 Scope: scope, 225 Color: &color, 226 Multiple: multiple, 227 Created: time.Now(), 228 } 229 if err := rp.validator.ValidateLabelDefinition(&label); err != nil { 230 fail(err.Error(), err) 231 return 232 } 233 234 // announce this relation into the firehose, store into owners' pds 235 client, err := rp.oauth.AuthorizedClient(r) 236 if err != nil { 237 fail(err.Error(), err) 238 return 239 } 240 241 // emit a labelRecord 242 labelRecord := label.AsRecord() 243 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 244 Collection: tangled.LabelDefinitionNSID, 245 Repo: label.Did, 246 Rkey: label.Rkey, 247 Record: &lexutil.LexiconTypeDecoder{ 248 Val: &labelRecord, 249 }, 250 }) 251 // invalid record 252 if err != nil { 253 fail("Failed to write record to PDS.", err) 254 return 255 } 256 257 aturi := resp.Uri 258 l = l.With("at-uri", aturi) 259 l.Info("wrote label record to PDS") 260 261 // update the repo to subscribe to this label 262 newRepo := f.Repo 263 newRepo.Labels = append(newRepo.Labels, aturi) 264 repoRecord := newRepo.AsRecord() 265 266 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 267 if err != nil { 268 fail("Failed to update labels, no record found on PDS.", err) 269 return 270 } 271 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 272 Collection: tangled.RepoNSID, 273 Repo: newRepo.Did, 274 Rkey: newRepo.Rkey, 275 SwapRecord: ex.Cid, 276 Record: &lexutil.LexiconTypeDecoder{ 277 Val: &repoRecord, 278 }, 279 }) 280 if err != nil { 281 fail("Failed to update labels for repo.", err) 282 return 283 } 284 285 tx, err := rp.db.BeginTx(r.Context(), nil) 286 if err != nil { 287 fail("Failed to add label.", err) 288 return 289 } 290 291 rollback := func() { 292 err1 := tx.Rollback() 293 err2 := rollbackRecord(context.Background(), aturi, client) 294 295 // ignore txn complete errors, this is okay 296 if errors.Is(err1, sql.ErrTxDone) { 297 err1 = nil 298 } 299 300 if errs := errors.Join(err1, err2); errs != nil { 301 l.Error("failed to rollback changes", "errs", errs) 302 return 303 } 304 } 305 defer rollback() 306 307 _, err = db.AddLabelDefinition(tx, &label) 308 if err != nil { 309 fail("Failed to add label.", err) 310 return 311 } 312 313 err = db.SubscribeLabel(tx, &models.RepoLabel{ 314 RepoAt: f.RepoAt(), 315 LabelAt: label.AtUri(), 316 }) 317 318 err = tx.Commit() 319 if err != nil { 320 fail("Failed to add label.", err) 321 return 322 } 323 324 // clear aturi when everything is successful 325 aturi = "" 326 327 rp.pages.HxRefresh(w) 328} 329 330func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) { 331 user := rp.oauth.GetUser(r) 332 l := rp.logger.With("handler", "DeleteLabel") 333 l = l.With("did", user.Did) 334 335 f, err := rp.repoResolver.Resolve(r) 336 if err != nil { 337 l.Error("failed to get repo and knot", "err", err) 338 return 339 } 340 341 errorId := "label-operation" 342 fail := func(msg string, err error) { 343 l.Error(msg, "err", err) 344 rp.pages.Notice(w, errorId, msg) 345 } 346 347 // get form values 348 labelId := r.FormValue("label-id") 349 350 label, err := db.GetLabelDefinition(rp.db, db.FilterEq("id", labelId)) 351 if err != nil { 352 fail("Failed to find label definition.", err) 353 return 354 } 355 356 client, err := rp.oauth.AuthorizedClient(r) 357 if err != nil { 358 fail(err.Error(), err) 359 return 360 } 361 362 // delete label record from PDS 363 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 364 Collection: tangled.LabelDefinitionNSID, 365 Repo: label.Did, 366 Rkey: label.Rkey, 367 }) 368 if err != nil { 369 fail("Failed to delete label record from PDS.", err) 370 return 371 } 372 373 // update repo record to remove the label reference 374 newRepo := f.Repo 375 var updated []string 376 removedAt := label.AtUri().String() 377 for _, l := range newRepo.Labels { 378 if l != removedAt { 379 updated = append(updated, l) 380 } 381 } 382 newRepo.Labels = updated 383 repoRecord := newRepo.AsRecord() 384 385 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 386 if err != nil { 387 fail("Failed to update labels, no record found on PDS.", err) 388 return 389 } 390 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 391 Collection: tangled.RepoNSID, 392 Repo: newRepo.Did, 393 Rkey: newRepo.Rkey, 394 SwapRecord: ex.Cid, 395 Record: &lexutil.LexiconTypeDecoder{ 396 Val: &repoRecord, 397 }, 398 }) 399 if err != nil { 400 fail("Failed to update repo record.", err) 401 return 402 } 403 404 // transaction for DB changes 405 tx, err := rp.db.BeginTx(r.Context(), nil) 406 if err != nil { 407 fail("Failed to delete label.", err) 408 return 409 } 410 defer tx.Rollback() 411 412 err = db.UnsubscribeLabel( 413 tx, 414 db.FilterEq("repo_at", f.RepoAt()), 415 db.FilterEq("label_at", removedAt), 416 ) 417 if err != nil { 418 fail("Failed to unsubscribe label.", err) 419 return 420 } 421 422 err = db.DeleteLabelDefinition(tx, db.FilterEq("id", label.Id)) 423 if err != nil { 424 fail("Failed to delete label definition.", err) 425 return 426 } 427 428 err = tx.Commit() 429 if err != nil { 430 fail("Failed to delete label.", err) 431 return 432 } 433 434 // everything succeeded 435 rp.pages.HxRefresh(w) 436} 437 438func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) { 439 user := rp.oauth.GetUser(r) 440 l := rp.logger.With("handler", "SubscribeLabel") 441 l = l.With("did", user.Did) 442 443 f, err := rp.repoResolver.Resolve(r) 444 if err != nil { 445 l.Error("failed to get repo and knot", "err", err) 446 return 447 } 448 449 if err := r.ParseForm(); err != nil { 450 l.Error("invalid form", "err", err) 451 return 452 } 453 454 errorId := "default-label-operation" 455 fail := func(msg string, err error) { 456 l.Error(msg, "err", err) 457 rp.pages.Notice(w, errorId, msg) 458 } 459 460 labelAts := r.Form["label"] 461 _, err = db.GetLabelDefinitions(rp.db, db.FilterIn("at_uri", labelAts)) 462 if err != nil { 463 fail("Failed to subscribe to label.", err) 464 return 465 } 466 467 newRepo := f.Repo 468 newRepo.Labels = append(newRepo.Labels, labelAts...) 469 470 // dedup 471 slices.Sort(newRepo.Labels) 472 newRepo.Labels = slices.Compact(newRepo.Labels) 473 474 repoRecord := newRepo.AsRecord() 475 476 client, err := rp.oauth.AuthorizedClient(r) 477 if err != nil { 478 fail(err.Error(), err) 479 return 480 } 481 482 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Repo.Did, f.Repo.Rkey) 483 if err != nil { 484 fail("Failed to update labels, no record found on PDS.", err) 485 return 486 } 487 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 488 Collection: tangled.RepoNSID, 489 Repo: newRepo.Did, 490 Rkey: newRepo.Rkey, 491 SwapRecord: ex.Cid, 492 Record: &lexutil.LexiconTypeDecoder{ 493 Val: &repoRecord, 494 }, 495 }) 496 497 tx, err := rp.db.Begin() 498 if err != nil { 499 fail("Failed to subscribe to label.", err) 500 return 501 } 502 defer tx.Rollback() 503 504 for _, l := range labelAts { 505 err = db.SubscribeLabel(tx, &models.RepoLabel{ 506 RepoAt: f.RepoAt(), 507 LabelAt: syntax.ATURI(l), 508 }) 509 if err != nil { 510 fail("Failed to subscribe to label.", err) 511 return 512 } 513 } 514 515 if err := tx.Commit(); err != nil { 516 fail("Failed to subscribe to label.", err) 517 return 518 } 519 520 // everything succeeded 521 rp.pages.HxRefresh(w) 522} 523 524func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) { 525 user := rp.oauth.GetUser(r) 526 l := rp.logger.With("handler", "UnsubscribeLabel") 527 l = l.With("did", user.Did) 528 529 f, err := rp.repoResolver.Resolve(r) 530 if err != nil { 531 l.Error("failed to get repo and knot", "err", err) 532 return 533 } 534 535 if err := r.ParseForm(); err != nil { 536 l.Error("invalid form", "err", err) 537 return 538 } 539 540 errorId := "default-label-operation" 541 fail := func(msg string, err error) { 542 l.Error(msg, "err", err) 543 rp.pages.Notice(w, errorId, msg) 544 } 545 546 labelAts := r.Form["label"] 547 _, err = db.GetLabelDefinitions(rp.db, db.FilterIn("at_uri", labelAts)) 548 if err != nil { 549 fail("Failed to unsubscribe to label.", err) 550 return 551 } 552 553 // update repo record to remove the label reference 554 newRepo := f.Repo 555 var updated []string 556 for _, l := range newRepo.Labels { 557 if !slices.Contains(labelAts, l) { 558 updated = append(updated, l) 559 } 560 } 561 newRepo.Labels = updated 562 repoRecord := newRepo.AsRecord() 563 564 client, err := rp.oauth.AuthorizedClient(r) 565 if err != nil { 566 fail(err.Error(), err) 567 return 568 } 569 570 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Repo.Did, f.Repo.Rkey) 571 if err != nil { 572 fail("Failed to update labels, no record found on PDS.", err) 573 return 574 } 575 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 576 Collection: tangled.RepoNSID, 577 Repo: newRepo.Did, 578 Rkey: newRepo.Rkey, 579 SwapRecord: ex.Cid, 580 Record: &lexutil.LexiconTypeDecoder{ 581 Val: &repoRecord, 582 }, 583 }) 584 585 err = db.UnsubscribeLabel( 586 rp.db, 587 db.FilterEq("repo_at", f.RepoAt()), 588 db.FilterIn("label_at", labelAts), 589 ) 590 if err != nil { 591 fail("Failed to unsubscribe label.", err) 592 return 593 } 594 595 // everything succeeded 596 rp.pages.HxRefresh(w) 597} 598 599func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) { 600 l := rp.logger.With("handler", "LabelPanel") 601 602 f, err := rp.repoResolver.Resolve(r) 603 if err != nil { 604 l.Error("failed to get repo and knot", "err", err) 605 return 606 } 607 608 subjectStr := r.FormValue("subject") 609 subject, err := syntax.ParseATURI(subjectStr) 610 if err != nil { 611 l.Error("failed to get repo and knot", "err", err) 612 return 613 } 614 615 labelDefs, err := db.GetLabelDefinitions( 616 rp.db, 617 db.FilterIn("at_uri", f.Repo.Labels), 618 db.FilterContains("scope", subject.Collection().String()), 619 ) 620 if err != nil { 621 l.Error("failed to fetch label defs", "err", err) 622 return 623 } 624 625 defs := make(map[string]*models.LabelDefinition) 626 for _, l := range labelDefs { 627 defs[l.AtUri().String()] = &l 628 } 629 630 states, err := db.GetLabels(rp.db, db.FilterEq("subject", subject)) 631 if err != nil { 632 l.Error("failed to build label state", "err", err) 633 return 634 } 635 state := states[subject] 636 637 user := rp.oauth.GetUser(r) 638 rp.pages.LabelPanel(w, pages.LabelPanelParams{ 639 LoggedInUser: user, 640 RepoInfo: f.RepoInfo(user), 641 Defs: defs, 642 Subject: subject.String(), 643 State: state, 644 }) 645} 646 647func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) { 648 l := rp.logger.With("handler", "EditLabelPanel") 649 650 f, err := rp.repoResolver.Resolve(r) 651 if err != nil { 652 l.Error("failed to get repo and knot", "err", err) 653 return 654 } 655 656 subjectStr := r.FormValue("subject") 657 subject, err := syntax.ParseATURI(subjectStr) 658 if err != nil { 659 l.Error("failed to get repo and knot", "err", err) 660 return 661 } 662 663 labelDefs, err := db.GetLabelDefinitions( 664 rp.db, 665 db.FilterIn("at_uri", f.Repo.Labels), 666 db.FilterContains("scope", subject.Collection().String()), 667 ) 668 if err != nil { 669 l.Error("failed to fetch labels", "err", err) 670 return 671 } 672 673 defs := make(map[string]*models.LabelDefinition) 674 for _, l := range labelDefs { 675 defs[l.AtUri().String()] = &l 676 } 677 678 states, err := db.GetLabels(rp.db, db.FilterEq("subject", subject)) 679 if err != nil { 680 l.Error("failed to build label state", "err", err) 681 return 682 } 683 state := states[subject] 684 685 user := rp.oauth.GetUser(r) 686 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{ 687 LoggedInUser: user, 688 RepoInfo: f.RepoInfo(user), 689 Defs: defs, 690 Subject: subject.String(), 691 State: state, 692 }) 693} 694 695func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) { 696 user := rp.oauth.GetUser(r) 697 l := rp.logger.With("handler", "AddCollaborator") 698 l = l.With("did", user.Did) 699 700 f, err := rp.repoResolver.Resolve(r) 701 if err != nil { 702 l.Error("failed to get repo and knot", "err", err) 703 return 704 } 705 706 errorId := "add-collaborator-error" 707 fail := func(msg string, err error) { 708 l.Error(msg, "err", err) 709 rp.pages.Notice(w, errorId, msg) 710 } 711 712 collaborator := r.FormValue("collaborator") 713 if collaborator == "" { 714 fail("Invalid form.", nil) 715 return 716 } 717 718 // remove a single leading `@`, to make @handle work with ResolveIdent 719 collaborator = strings.TrimPrefix(collaborator, "@") 720 721 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator) 722 if err != nil { 723 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err) 724 return 725 } 726 727 if collaboratorIdent.DID.String() == user.Did { 728 fail("You seem to be adding yourself as a collaborator.", nil) 729 return 730 } 731 l = l.With("collaborator", collaboratorIdent.Handle) 732 l = l.With("knot", f.Knot) 733 734 // announce this relation into the firehose, store into owners' pds 735 client, err := rp.oauth.AuthorizedClient(r) 736 if err != nil { 737 fail("Failed to write to PDS.", err) 738 return 739 } 740 741 // emit a record 742 currentUser := rp.oauth.GetUser(r) 743 rkey := tid.TID() 744 createdAt := time.Now() 745 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 746 Collection: tangled.RepoCollaboratorNSID, 747 Repo: currentUser.Did, 748 Rkey: rkey, 749 Record: &lexutil.LexiconTypeDecoder{ 750 Val: &tangled.RepoCollaborator{ 751 Subject: collaboratorIdent.DID.String(), 752 Repo: string(f.RepoAt()), 753 CreatedAt: createdAt.Format(time.RFC3339), 754 }}, 755 }) 756 // invalid record 757 if err != nil { 758 fail("Failed to write record to PDS.", err) 759 return 760 } 761 762 aturi := resp.Uri 763 l = l.With("at-uri", aturi) 764 l.Info("wrote record to PDS") 765 766 tx, err := rp.db.BeginTx(r.Context(), nil) 767 if err != nil { 768 fail("Failed to add collaborator.", err) 769 return 770 } 771 772 rollback := func() { 773 err1 := tx.Rollback() 774 err2 := rp.enforcer.E.LoadPolicy() 775 err3 := rollbackRecord(context.Background(), aturi, client) 776 777 // ignore txn complete errors, this is okay 778 if errors.Is(err1, sql.ErrTxDone) { 779 err1 = nil 780 } 781 782 if errs := errors.Join(err1, err2, err3); errs != nil { 783 l.Error("failed to rollback changes", "errs", errs) 784 return 785 } 786 } 787 defer rollback() 788 789 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.DidSlashRepo()) 790 if err != nil { 791 fail("Failed to add collaborator permissions.", err) 792 return 793 } 794 795 err = db.AddCollaborator(tx, models.Collaborator{ 796 Did: syntax.DID(currentUser.Did), 797 Rkey: rkey, 798 SubjectDid: collaboratorIdent.DID, 799 RepoAt: f.RepoAt(), 800 Created: createdAt, 801 }) 802 if err != nil { 803 fail("Failed to add collaborator.", err) 804 return 805 } 806 807 err = tx.Commit() 808 if err != nil { 809 fail("Failed to add collaborator.", err) 810 return 811 } 812 813 err = rp.enforcer.E.SavePolicy() 814 if err != nil { 815 fail("Failed to update collaborator permissions.", err) 816 return 817 } 818 819 // clear aturi to when everything is successful 820 aturi = "" 821 822 rp.pages.HxRefresh(w) 823} 824 825func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) { 826 user := rp.oauth.GetUser(r) 827 l := rp.logger.With("handler", "DeleteRepo") 828 829 noticeId := "operation-error" 830 f, err := rp.repoResolver.Resolve(r) 831 if err != nil { 832 l.Error("failed to get repo and knot", "err", err) 833 return 834 } 835 836 // remove record from pds 837 atpClient, err := rp.oauth.AuthorizedClient(r) 838 if err != nil { 839 l.Error("failed to get authorized client", "err", err) 840 return 841 } 842 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{ 843 Collection: tangled.RepoNSID, 844 Repo: user.Did, 845 Rkey: f.Rkey, 846 }) 847 if err != nil { 848 l.Error("failed to delete record", "err", err) 849 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.") 850 return 851 } 852 l.Info("removed repo record", "aturi", f.RepoAt().String()) 853 854 client, err := rp.oauth.ServiceClient( 855 r, 856 oauth.WithService(f.Knot), 857 oauth.WithLxm(tangled.RepoDeleteNSID), 858 oauth.WithDev(rp.config.Core.Dev), 859 ) 860 if err != nil { 861 l.Error("failed to connect to knot server", "err", err) 862 return 863 } 864 865 err = tangled.RepoDelete( 866 r.Context(), 867 client, 868 &tangled.RepoDelete_Input{ 869 Did: f.OwnerDid(), 870 Name: f.Name, 871 Rkey: f.Rkey, 872 }, 873 ) 874 if err := xrpcclient.HandleXrpcErr(err); err != nil { 875 rp.pages.Notice(w, noticeId, err.Error()) 876 return 877 } 878 l.Info("deleted repo from knot") 879 880 tx, err := rp.db.BeginTx(r.Context(), nil) 881 if err != nil { 882 l.Error("failed to start tx") 883 w.Write(fmt.Append(nil, "failed to add collaborator: ", err)) 884 return 885 } 886 defer func() { 887 tx.Rollback() 888 err = rp.enforcer.E.LoadPolicy() 889 if err != nil { 890 l.Error("failed to rollback policies") 891 } 892 }() 893 894 // remove collaborator RBAC 895 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.DidSlashRepo(), f.Knot) 896 if err != nil { 897 rp.pages.Notice(w, noticeId, "Failed to remove collaborators") 898 return 899 } 900 for _, c := range repoCollaborators { 901 did := c[0] 902 rp.enforcer.RemoveCollaborator(did, f.Knot, f.DidSlashRepo()) 903 } 904 l.Info("removed collaborators") 905 906 // remove repo RBAC 907 err = rp.enforcer.RemoveRepo(f.OwnerDid(), f.Knot, f.DidSlashRepo()) 908 if err != nil { 909 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules") 910 return 911 } 912 913 // remove repo from db 914 err = db.RemoveRepo(tx, f.OwnerDid(), f.Name) 915 if err != nil { 916 rp.pages.Notice(w, noticeId, "Failed to update appview") 917 return 918 } 919 l.Info("removed repo from db") 920 921 err = tx.Commit() 922 if err != nil { 923 l.Error("failed to commit changes", "err", err) 924 http.Error(w, err.Error(), http.StatusInternalServerError) 925 return 926 } 927 928 err = rp.enforcer.E.SavePolicy() 929 if err != nil { 930 l.Error("failed to update ACLs", "err", err) 931 http.Error(w, err.Error(), http.StatusInternalServerError) 932 return 933 } 934 935 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.OwnerDid())) 936} 937 938func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) { 939 l := rp.logger.With("handler", "SyncRepoFork") 940 941 ref := chi.URLParam(r, "ref") 942 ref, _ = url.PathUnescape(ref) 943 944 user := rp.oauth.GetUser(r) 945 f, err := rp.repoResolver.Resolve(r) 946 if err != nil { 947 l.Error("failed to resolve source repo", "err", err) 948 return 949 } 950 951 switch r.Method { 952 case http.MethodPost: 953 client, err := rp.oauth.ServiceClient( 954 r, 955 oauth.WithService(f.Knot), 956 oauth.WithLxm(tangled.RepoForkSyncNSID), 957 oauth.WithDev(rp.config.Core.Dev), 958 ) 959 if err != nil { 960 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 961 return 962 } 963 964 repoInfo := f.RepoInfo(user) 965 if repoInfo.Source == nil { 966 rp.pages.Notice(w, "repo", "This repository is not a fork.") 967 return 968 } 969 970 err = tangled.RepoForkSync( 971 r.Context(), 972 client, 973 &tangled.RepoForkSync_Input{ 974 Did: user.Did, 975 Name: f.Name, 976 Source: repoInfo.Source.RepoAt().String(), 977 Branch: ref, 978 }, 979 ) 980 if err := xrpcclient.HandleXrpcErr(err); err != nil { 981 rp.pages.Notice(w, "repo", err.Error()) 982 return 983 } 984 985 rp.pages.HxRefresh(w) 986 return 987 } 988} 989 990func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) { 991 l := rp.logger.With("handler", "ForkRepo") 992 993 user := rp.oauth.GetUser(r) 994 f, err := rp.repoResolver.Resolve(r) 995 if err != nil { 996 l.Error("failed to resolve source repo", "err", err) 997 return 998 } 999 1000 switch r.Method { 1001 case http.MethodGet: 1002 user := rp.oauth.GetUser(r) 1003 knots, err := rp.enforcer.GetKnotsForUser(user.Did) 1004 if err != nil { 1005 rp.pages.Notice(w, "repo", "Invalid user account.") 1006 return 1007 } 1008 1009 rp.pages.ForkRepo(w, pages.ForkRepoParams{ 1010 LoggedInUser: user, 1011 Knots: knots, 1012 RepoInfo: f.RepoInfo(user), 1013 }) 1014 1015 case http.MethodPost: 1016 l := rp.logger.With("handler", "ForkRepo") 1017 1018 targetKnot := r.FormValue("knot") 1019 if targetKnot == "" { 1020 rp.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.") 1021 return 1022 } 1023 l = l.With("targetKnot", targetKnot) 1024 1025 ok, err := rp.enforcer.E.Enforce(user.Did, targetKnot, targetKnot, "repo:create") 1026 if err != nil || !ok { 1027 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 1028 return 1029 } 1030 1031 // choose a name for a fork 1032 forkName := r.FormValue("repo_name") 1033 if forkName == "" { 1034 rp.pages.Notice(w, "repo", "Repository name cannot be empty.") 1035 return 1036 } 1037 1038 // this check is *only* to see if the forked repo name already exists 1039 // in the user's account. 1040 existingRepo, err := db.GetRepo( 1041 rp.db, 1042 db.FilterEq("did", user.Did), 1043 db.FilterEq("name", forkName), 1044 ) 1045 if err != nil { 1046 if !errors.Is(err, sql.ErrNoRows) { 1047 l.Error("error fetching existing repo from db", "err", err) 1048 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.") 1049 return 1050 } 1051 } else if existingRepo != nil { 1052 // repo with this name already exists 1053 rp.pages.Notice(w, "repo", "A repository with this name already exists.") 1054 return 1055 } 1056 l = l.With("forkName", forkName) 1057 1058 uri := "https" 1059 if rp.config.Core.Dev { 1060 uri = "http" 1061 } 1062 1063 forkSourceUrl := fmt.Sprintf("%s://%s/%s/%s", uri, f.Knot, f.OwnerDid(), f.Repo.Name) 1064 l = l.With("cloneUrl", forkSourceUrl) 1065 1066 sourceAt := f.RepoAt().String() 1067 1068 // create an atproto record for this fork 1069 rkey := tid.TID() 1070 repo := &models.Repo{ 1071 Did: user.Did, 1072 Name: forkName, 1073 Knot: targetKnot, 1074 Rkey: rkey, 1075 Source: sourceAt, 1076 Description: f.Repo.Description, 1077 Created: time.Now(), 1078 Labels: rp.config.Label.DefaultLabelDefs, 1079 } 1080 record := repo.AsRecord() 1081 1082 atpClient, err := rp.oauth.AuthorizedClient(r) 1083 if err != nil { 1084 l.Error("failed to create xrpcclient", "err", err) 1085 rp.pages.Notice(w, "repo", "Failed to fork repository.") 1086 return 1087 } 1088 1089 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 1090 Collection: tangled.RepoNSID, 1091 Repo: user.Did, 1092 Rkey: rkey, 1093 Record: &lexutil.LexiconTypeDecoder{ 1094 Val: &record, 1095 }, 1096 }) 1097 if err != nil { 1098 l.Error("failed to write to PDS", "err", err) 1099 rp.pages.Notice(w, "repo", "Failed to announce repository creation.") 1100 return 1101 } 1102 1103 aturi := atresp.Uri 1104 l = l.With("aturi", aturi) 1105 l.Info("wrote to PDS") 1106 1107 tx, err := rp.db.BeginTx(r.Context(), nil) 1108 if err != nil { 1109 l.Info("txn failed", "err", err) 1110 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1111 return 1112 } 1113 1114 // The rollback function reverts a few things on failure: 1115 // - the pending txn 1116 // - the ACLs 1117 // - the atproto record created 1118 rollback := func() { 1119 err1 := tx.Rollback() 1120 err2 := rp.enforcer.E.LoadPolicy() 1121 err3 := rollbackRecord(context.Background(), aturi, atpClient) 1122 1123 // ignore txn complete errors, this is okay 1124 if errors.Is(err1, sql.ErrTxDone) { 1125 err1 = nil 1126 } 1127 1128 if errs := errors.Join(err1, err2, err3); errs != nil { 1129 l.Error("failed to rollback changes", "errs", errs) 1130 return 1131 } 1132 } 1133 defer rollback() 1134 1135 client, err := rp.oauth.ServiceClient( 1136 r, 1137 oauth.WithService(targetKnot), 1138 oauth.WithLxm(tangled.RepoCreateNSID), 1139 oauth.WithDev(rp.config.Core.Dev), 1140 ) 1141 if err != nil { 1142 l.Error("could not create service client", "err", err) 1143 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 1144 return 1145 } 1146 1147 err = tangled.RepoCreate( 1148 r.Context(), 1149 client, 1150 &tangled.RepoCreate_Input{ 1151 Rkey: rkey, 1152 Source: &forkSourceUrl, 1153 }, 1154 ) 1155 if err := xrpcclient.HandleXrpcErr(err); err != nil { 1156 rp.pages.Notice(w, "repo", err.Error()) 1157 return 1158 } 1159 1160 err = db.AddRepo(tx, repo) 1161 if err != nil { 1162 l.Error("failed to AddRepo", "err", err) 1163 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1164 return 1165 } 1166 1167 // acls 1168 p, _ := securejoin.SecureJoin(user.Did, forkName) 1169 err = rp.enforcer.AddRepo(user.Did, targetKnot, p) 1170 if err != nil { 1171 l.Error("failed to add ACLs", "err", err) 1172 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.") 1173 return 1174 } 1175 1176 err = tx.Commit() 1177 if err != nil { 1178 l.Error("failed to commit changes", "err", err) 1179 http.Error(w, err.Error(), http.StatusInternalServerError) 1180 return 1181 } 1182 1183 err = rp.enforcer.E.SavePolicy() 1184 if err != nil { 1185 l.Error("failed to update ACLs", "err", err) 1186 http.Error(w, err.Error(), http.StatusInternalServerError) 1187 return 1188 } 1189 1190 // reset the ATURI because the transaction completed successfully 1191 aturi = "" 1192 1193 rp.notifier.NewRepo(r.Context(), repo) 1194 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Did, forkName)) 1195 } 1196} 1197 1198// this is used to rollback changes made to the PDS 1199// 1200// it is a no-op if the provided ATURI is empty 1201func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 1202 if aturi == "" { 1203 return nil 1204 } 1205 1206 parsed := syntax.ATURI(aturi) 1207 1208 collection := parsed.Collection().String() 1209 repo := parsed.Authority().String() 1210 rkey := parsed.RecordKey().String() 1211 1212 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 1213 Collection: collection, 1214 Repo: repo, 1215 Rkey: rkey, 1216 }) 1217 return err 1218}