forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package repo 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "net/url" 11 "slices" 12 "strings" 13 "time" 14 15 "tangled.org/core/api/tangled" 16 "tangled.org/core/appview/config" 17 "tangled.org/core/appview/db" 18 "tangled.org/core/appview/models" 19 "tangled.org/core/appview/notify" 20 "tangled.org/core/appview/oauth" 21 "tangled.org/core/appview/pages" 22 "tangled.org/core/appview/reporesolver" 23 "tangled.org/core/appview/validator" 24 xrpcclient "tangled.org/core/appview/xrpcclient" 25 "tangled.org/core/eventconsumer" 26 "tangled.org/core/idresolver" 27 "tangled.org/core/rbac" 28 "tangled.org/core/tid" 29 "tangled.org/core/xrpc/serviceauth" 30 31 comatproto "github.com/bluesky-social/indigo/api/atproto" 32 atpclient "github.com/bluesky-social/indigo/atproto/client" 33 "github.com/bluesky-social/indigo/atproto/syntax" 34 lexutil "github.com/bluesky-social/indigo/lex/util" 35 securejoin "github.com/cyphar/filepath-securejoin" 36 "github.com/go-chi/chi/v5" 37) 38 39type Repo struct { 40 repoResolver *reporesolver.RepoResolver 41 idResolver *idresolver.Resolver 42 config *config.Config 43 oauth *oauth.OAuth 44 pages *pages.Pages 45 spindlestream *eventconsumer.Consumer 46 db *db.DB 47 enforcer *rbac.Enforcer 48 notifier notify.Notifier 49 logger *slog.Logger 50 serviceAuth *serviceauth.ServiceAuth 51 validator *validator.Validator 52} 53 54func New( 55 oauth *oauth.OAuth, 56 repoResolver *reporesolver.RepoResolver, 57 pages *pages.Pages, 58 spindlestream *eventconsumer.Consumer, 59 idResolver *idresolver.Resolver, 60 db *db.DB, 61 config *config.Config, 62 notifier notify.Notifier, 63 enforcer *rbac.Enforcer, 64 logger *slog.Logger, 65 validator *validator.Validator, 66) *Repo { 67 return &Repo{oauth: oauth, 68 repoResolver: repoResolver, 69 pages: pages, 70 idResolver: idResolver, 71 config: config, 72 spindlestream: spindlestream, 73 db: db, 74 notifier: notifier, 75 enforcer: enforcer, 76 logger: logger, 77 validator: validator, 78 } 79} 80 81// modify the spindle configured for this repo 82func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) { 83 user := rp.oauth.GetUser(r) 84 l := rp.logger.With("handler", "EditSpindle") 85 l = l.With("did", user.Did) 86 87 errorId := "operation-error" 88 fail := func(msg string, err error) { 89 l.Error(msg, "err", err) 90 rp.pages.Notice(w, errorId, msg) 91 } 92 93 f, err := rp.repoResolver.Resolve(r) 94 if err != nil { 95 fail("Failed to resolve repo. Try again later", err) 96 return 97 } 98 99 newSpindle := r.FormValue("spindle") 100 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value 101 client, err := rp.oauth.AuthorizedClient(r) 102 if err != nil { 103 fail("Failed to authorize. Try again later.", err) 104 return 105 } 106 107 if !removingSpindle { 108 // ensure that this is a valid spindle for this user 109 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Did) 110 if err != nil { 111 fail("Failed to find spindles. Try again later.", err) 112 return 113 } 114 115 if !slices.Contains(validSpindles, newSpindle) { 116 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles)) 117 return 118 } 119 } 120 121 newRepo := f.Repo 122 newRepo.Spindle = newSpindle 123 record := newRepo.AsRecord() 124 125 spindlePtr := &newSpindle 126 if removingSpindle { 127 spindlePtr = nil 128 newRepo.Spindle = "" 129 } 130 131 // optimistic update 132 err = db.UpdateSpindle(rp.db, newRepo.RepoAt().String(), spindlePtr) 133 if err != nil { 134 fail("Failed to update spindle. Try again later.", err) 135 return 136 } 137 138 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 139 if err != nil { 140 fail("Failed to update spindle, no record found on PDS.", err) 141 return 142 } 143 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 144 Collection: tangled.RepoNSID, 145 Repo: newRepo.Did, 146 Rkey: newRepo.Rkey, 147 SwapRecord: ex.Cid, 148 Record: &lexutil.LexiconTypeDecoder{ 149 Val: &record, 150 }, 151 }) 152 153 if err != nil { 154 fail("Failed to update spindle, unable to save to PDS.", err) 155 return 156 } 157 158 if !removingSpindle { 159 // add this spindle to spindle stream 160 rp.spindlestream.AddSource( 161 context.Background(), 162 eventconsumer.NewSpindleSource(newSpindle), 163 ) 164 } 165 166 rp.pages.HxRefresh(w) 167} 168 169func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) { 170 user := rp.oauth.GetUser(r) 171 l := rp.logger.With("handler", "AddLabel") 172 l = l.With("did", user.Did) 173 174 f, err := rp.repoResolver.Resolve(r) 175 if err != nil { 176 l.Error("failed to get repo and knot", "err", err) 177 return 178 } 179 180 errorId := "add-label-error" 181 fail := func(msg string, err error) { 182 l.Error(msg, "err", err) 183 rp.pages.Notice(w, errorId, msg) 184 } 185 186 // get form values for label definition 187 name := r.FormValue("name") 188 concreteType := r.FormValue("valueType") 189 valueFormat := r.FormValue("valueFormat") 190 enumValues := r.FormValue("enumValues") 191 scope := r.Form["scope"] 192 color := r.FormValue("color") 193 multiple := r.FormValue("multiple") == "true" 194 195 var variants []string 196 for part := range strings.SplitSeq(enumValues, ",") { 197 if part = strings.TrimSpace(part); part != "" { 198 variants = append(variants, part) 199 } 200 } 201 202 if concreteType == "" { 203 concreteType = "null" 204 } 205 206 format := models.ValueTypeFormatAny 207 if valueFormat == "did" { 208 format = models.ValueTypeFormatDid 209 } 210 211 valueType := models.ValueType{ 212 Type: models.ConcreteType(concreteType), 213 Format: format, 214 Enum: variants, 215 } 216 217 label := models.LabelDefinition{ 218 Did: user.Did, 219 Rkey: tid.TID(), 220 Name: name, 221 ValueType: valueType, 222 Scope: scope, 223 Color: &color, 224 Multiple: multiple, 225 Created: time.Now(), 226 } 227 if err := rp.validator.ValidateLabelDefinition(&label); err != nil { 228 fail(err.Error(), err) 229 return 230 } 231 232 // announce this relation into the firehose, store into owners' pds 233 client, err := rp.oauth.AuthorizedClient(r) 234 if err != nil { 235 fail(err.Error(), err) 236 return 237 } 238 239 // emit a labelRecord 240 labelRecord := label.AsRecord() 241 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 242 Collection: tangled.LabelDefinitionNSID, 243 Repo: label.Did, 244 Rkey: label.Rkey, 245 Record: &lexutil.LexiconTypeDecoder{ 246 Val: &labelRecord, 247 }, 248 }) 249 // invalid record 250 if err != nil { 251 fail("Failed to write record to PDS.", err) 252 return 253 } 254 255 aturi := resp.Uri 256 l = l.With("at-uri", aturi) 257 l.Info("wrote label record to PDS") 258 259 // update the repo to subscribe to this label 260 newRepo := f.Repo 261 newRepo.Labels = append(newRepo.Labels, aturi) 262 repoRecord := newRepo.AsRecord() 263 264 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 265 if err != nil { 266 fail("Failed to update labels, no record found on PDS.", err) 267 return 268 } 269 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 270 Collection: tangled.RepoNSID, 271 Repo: newRepo.Did, 272 Rkey: newRepo.Rkey, 273 SwapRecord: ex.Cid, 274 Record: &lexutil.LexiconTypeDecoder{ 275 Val: &repoRecord, 276 }, 277 }) 278 if err != nil { 279 fail("Failed to update labels for repo.", err) 280 return 281 } 282 283 tx, err := rp.db.BeginTx(r.Context(), nil) 284 if err != nil { 285 fail("Failed to add label.", err) 286 return 287 } 288 289 rollback := func() { 290 err1 := tx.Rollback() 291 err2 := rollbackRecord(context.Background(), aturi, client) 292 293 // ignore txn complete errors, this is okay 294 if errors.Is(err1, sql.ErrTxDone) { 295 err1 = nil 296 } 297 298 if errs := errors.Join(err1, err2); errs != nil { 299 l.Error("failed to rollback changes", "errs", errs) 300 return 301 } 302 } 303 defer rollback() 304 305 _, err = db.AddLabelDefinition(tx, &label) 306 if err != nil { 307 fail("Failed to add label.", err) 308 return 309 } 310 311 err = db.SubscribeLabel(tx, &models.RepoLabel{ 312 RepoAt: f.RepoAt(), 313 LabelAt: label.AtUri(), 314 }) 315 316 err = tx.Commit() 317 if err != nil { 318 fail("Failed to add label.", err) 319 return 320 } 321 322 // clear aturi when everything is successful 323 aturi = "" 324 325 rp.pages.HxRefresh(w) 326} 327 328func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) { 329 user := rp.oauth.GetUser(r) 330 l := rp.logger.With("handler", "DeleteLabel") 331 l = l.With("did", user.Did) 332 333 f, err := rp.repoResolver.Resolve(r) 334 if err != nil { 335 l.Error("failed to get repo and knot", "err", err) 336 return 337 } 338 339 errorId := "label-operation" 340 fail := func(msg string, err error) { 341 l.Error(msg, "err", err) 342 rp.pages.Notice(w, errorId, msg) 343 } 344 345 // get form values 346 labelId := r.FormValue("label-id") 347 348 label, err := db.GetLabelDefinition(rp.db, db.FilterEq("id", labelId)) 349 if err != nil { 350 fail("Failed to find label definition.", err) 351 return 352 } 353 354 client, err := rp.oauth.AuthorizedClient(r) 355 if err != nil { 356 fail(err.Error(), err) 357 return 358 } 359 360 // delete label record from PDS 361 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 362 Collection: tangled.LabelDefinitionNSID, 363 Repo: label.Did, 364 Rkey: label.Rkey, 365 }) 366 if err != nil { 367 fail("Failed to delete label record from PDS.", err) 368 return 369 } 370 371 // update repo record to remove the label reference 372 newRepo := f.Repo 373 var updated []string 374 removedAt := label.AtUri().String() 375 for _, l := range newRepo.Labels { 376 if l != removedAt { 377 updated = append(updated, l) 378 } 379 } 380 newRepo.Labels = updated 381 repoRecord := newRepo.AsRecord() 382 383 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 384 if err != nil { 385 fail("Failed to update labels, no record found on PDS.", err) 386 return 387 } 388 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 389 Collection: tangled.RepoNSID, 390 Repo: newRepo.Did, 391 Rkey: newRepo.Rkey, 392 SwapRecord: ex.Cid, 393 Record: &lexutil.LexiconTypeDecoder{ 394 Val: &repoRecord, 395 }, 396 }) 397 if err != nil { 398 fail("Failed to update repo record.", err) 399 return 400 } 401 402 // transaction for DB changes 403 tx, err := rp.db.BeginTx(r.Context(), nil) 404 if err != nil { 405 fail("Failed to delete label.", err) 406 return 407 } 408 defer tx.Rollback() 409 410 err = db.UnsubscribeLabel( 411 tx, 412 db.FilterEq("repo_at", f.RepoAt()), 413 db.FilterEq("label_at", removedAt), 414 ) 415 if err != nil { 416 fail("Failed to unsubscribe label.", err) 417 return 418 } 419 420 err = db.DeleteLabelDefinition(tx, db.FilterEq("id", label.Id)) 421 if err != nil { 422 fail("Failed to delete label definition.", err) 423 return 424 } 425 426 err = tx.Commit() 427 if err != nil { 428 fail("Failed to delete label.", err) 429 return 430 } 431 432 // everything succeeded 433 rp.pages.HxRefresh(w) 434} 435 436func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) { 437 user := rp.oauth.GetUser(r) 438 l := rp.logger.With("handler", "SubscribeLabel") 439 l = l.With("did", user.Did) 440 441 f, err := rp.repoResolver.Resolve(r) 442 if err != nil { 443 l.Error("failed to get repo and knot", "err", err) 444 return 445 } 446 447 if err := r.ParseForm(); err != nil { 448 l.Error("invalid form", "err", err) 449 return 450 } 451 452 errorId := "default-label-operation" 453 fail := func(msg string, err error) { 454 l.Error(msg, "err", err) 455 rp.pages.Notice(w, errorId, msg) 456 } 457 458 labelAts := r.Form["label"] 459 _, err = db.GetLabelDefinitions(rp.db, db.FilterIn("at_uri", labelAts)) 460 if err != nil { 461 fail("Failed to subscribe to label.", err) 462 return 463 } 464 465 newRepo := f.Repo 466 newRepo.Labels = append(newRepo.Labels, labelAts...) 467 468 // dedup 469 slices.Sort(newRepo.Labels) 470 newRepo.Labels = slices.Compact(newRepo.Labels) 471 472 repoRecord := newRepo.AsRecord() 473 474 client, err := rp.oauth.AuthorizedClient(r) 475 if err != nil { 476 fail(err.Error(), err) 477 return 478 } 479 480 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Repo.Did, f.Repo.Rkey) 481 if err != nil { 482 fail("Failed to update labels, no record found on PDS.", err) 483 return 484 } 485 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 486 Collection: tangled.RepoNSID, 487 Repo: newRepo.Did, 488 Rkey: newRepo.Rkey, 489 SwapRecord: ex.Cid, 490 Record: &lexutil.LexiconTypeDecoder{ 491 Val: &repoRecord, 492 }, 493 }) 494 495 tx, err := rp.db.Begin() 496 if err != nil { 497 fail("Failed to subscribe to label.", err) 498 return 499 } 500 defer tx.Rollback() 501 502 for _, l := range labelAts { 503 err = db.SubscribeLabel(tx, &models.RepoLabel{ 504 RepoAt: f.RepoAt(), 505 LabelAt: syntax.ATURI(l), 506 }) 507 if err != nil { 508 fail("Failed to subscribe to label.", err) 509 return 510 } 511 } 512 513 if err := tx.Commit(); err != nil { 514 fail("Failed to subscribe to label.", err) 515 return 516 } 517 518 // everything succeeded 519 rp.pages.HxRefresh(w) 520} 521 522func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) { 523 user := rp.oauth.GetUser(r) 524 l := rp.logger.With("handler", "UnsubscribeLabel") 525 l = l.With("did", user.Did) 526 527 f, err := rp.repoResolver.Resolve(r) 528 if err != nil { 529 l.Error("failed to get repo and knot", "err", err) 530 return 531 } 532 533 if err := r.ParseForm(); err != nil { 534 l.Error("invalid form", "err", err) 535 return 536 } 537 538 errorId := "default-label-operation" 539 fail := func(msg string, err error) { 540 l.Error(msg, "err", err) 541 rp.pages.Notice(w, errorId, msg) 542 } 543 544 labelAts := r.Form["label"] 545 _, err = db.GetLabelDefinitions(rp.db, db.FilterIn("at_uri", labelAts)) 546 if err != nil { 547 fail("Failed to unsubscribe to label.", err) 548 return 549 } 550 551 // update repo record to remove the label reference 552 newRepo := f.Repo 553 var updated []string 554 for _, l := range newRepo.Labels { 555 if !slices.Contains(labelAts, l) { 556 updated = append(updated, l) 557 } 558 } 559 newRepo.Labels = updated 560 repoRecord := newRepo.AsRecord() 561 562 client, err := rp.oauth.AuthorizedClient(r) 563 if err != nil { 564 fail(err.Error(), err) 565 return 566 } 567 568 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Repo.Did, f.Repo.Rkey) 569 if err != nil { 570 fail("Failed to update labels, no record found on PDS.", err) 571 return 572 } 573 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 574 Collection: tangled.RepoNSID, 575 Repo: newRepo.Did, 576 Rkey: newRepo.Rkey, 577 SwapRecord: ex.Cid, 578 Record: &lexutil.LexiconTypeDecoder{ 579 Val: &repoRecord, 580 }, 581 }) 582 583 err = db.UnsubscribeLabel( 584 rp.db, 585 db.FilterEq("repo_at", f.RepoAt()), 586 db.FilterIn("label_at", labelAts), 587 ) 588 if err != nil { 589 fail("Failed to unsubscribe label.", err) 590 return 591 } 592 593 // everything succeeded 594 rp.pages.HxRefresh(w) 595} 596 597func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) { 598 l := rp.logger.With("handler", "LabelPanel") 599 600 f, err := rp.repoResolver.Resolve(r) 601 if err != nil { 602 l.Error("failed to get repo and knot", "err", err) 603 return 604 } 605 606 subjectStr := r.FormValue("subject") 607 subject, err := syntax.ParseATURI(subjectStr) 608 if err != nil { 609 l.Error("failed to get repo and knot", "err", err) 610 return 611 } 612 613 labelDefs, err := db.GetLabelDefinitions( 614 rp.db, 615 db.FilterIn("at_uri", f.Repo.Labels), 616 db.FilterContains("scope", subject.Collection().String()), 617 ) 618 if err != nil { 619 l.Error("failed to fetch label defs", "err", err) 620 return 621 } 622 623 defs := make(map[string]*models.LabelDefinition) 624 for _, l := range labelDefs { 625 defs[l.AtUri().String()] = &l 626 } 627 628 states, err := db.GetLabels(rp.db, db.FilterEq("subject", subject)) 629 if err != nil { 630 l.Error("failed to build label state", "err", err) 631 return 632 } 633 state := states[subject] 634 635 user := rp.oauth.GetUser(r) 636 rp.pages.LabelPanel(w, pages.LabelPanelParams{ 637 LoggedInUser: user, 638 RepoInfo: f.RepoInfo(user), 639 Defs: defs, 640 Subject: subject.String(), 641 State: state, 642 }) 643} 644 645func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) { 646 l := rp.logger.With("handler", "EditLabelPanel") 647 648 f, err := rp.repoResolver.Resolve(r) 649 if err != nil { 650 l.Error("failed to get repo and knot", "err", err) 651 return 652 } 653 654 subjectStr := r.FormValue("subject") 655 subject, err := syntax.ParseATURI(subjectStr) 656 if err != nil { 657 l.Error("failed to get repo and knot", "err", err) 658 return 659 } 660 661 labelDefs, err := db.GetLabelDefinitions( 662 rp.db, 663 db.FilterIn("at_uri", f.Repo.Labels), 664 db.FilterContains("scope", subject.Collection().String()), 665 ) 666 if err != nil { 667 l.Error("failed to fetch labels", "err", err) 668 return 669 } 670 671 defs := make(map[string]*models.LabelDefinition) 672 for _, l := range labelDefs { 673 defs[l.AtUri().String()] = &l 674 } 675 676 states, err := db.GetLabels(rp.db, db.FilterEq("subject", subject)) 677 if err != nil { 678 l.Error("failed to build label state", "err", err) 679 return 680 } 681 state := states[subject] 682 683 user := rp.oauth.GetUser(r) 684 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{ 685 LoggedInUser: user, 686 RepoInfo: f.RepoInfo(user), 687 Defs: defs, 688 Subject: subject.String(), 689 State: state, 690 }) 691} 692 693func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) { 694 user := rp.oauth.GetUser(r) 695 l := rp.logger.With("handler", "AddCollaborator") 696 l = l.With("did", user.Did) 697 698 f, err := rp.repoResolver.Resolve(r) 699 if err != nil { 700 l.Error("failed to get repo and knot", "err", err) 701 return 702 } 703 704 errorId := "add-collaborator-error" 705 fail := func(msg string, err error) { 706 l.Error(msg, "err", err) 707 rp.pages.Notice(w, errorId, msg) 708 } 709 710 collaborator := r.FormValue("collaborator") 711 if collaborator == "" { 712 fail("Invalid form.", nil) 713 return 714 } 715 716 // remove a single leading `@`, to make @handle work with ResolveIdent 717 collaborator = strings.TrimPrefix(collaborator, "@") 718 719 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator) 720 if err != nil { 721 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err) 722 return 723 } 724 725 if collaboratorIdent.DID.String() == user.Did { 726 fail("You seem to be adding yourself as a collaborator.", nil) 727 return 728 } 729 l = l.With("collaborator", collaboratorIdent.Handle) 730 l = l.With("knot", f.Knot) 731 732 // announce this relation into the firehose, store into owners' pds 733 client, err := rp.oauth.AuthorizedClient(r) 734 if err != nil { 735 fail("Failed to write to PDS.", err) 736 return 737 } 738 739 // emit a record 740 currentUser := rp.oauth.GetUser(r) 741 rkey := tid.TID() 742 createdAt := time.Now() 743 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 744 Collection: tangled.RepoCollaboratorNSID, 745 Repo: currentUser.Did, 746 Rkey: rkey, 747 Record: &lexutil.LexiconTypeDecoder{ 748 Val: &tangled.RepoCollaborator{ 749 Subject: collaboratorIdent.DID.String(), 750 Repo: string(f.RepoAt()), 751 CreatedAt: createdAt.Format(time.RFC3339), 752 }}, 753 }) 754 // invalid record 755 if err != nil { 756 fail("Failed to write record to PDS.", err) 757 return 758 } 759 760 aturi := resp.Uri 761 l = l.With("at-uri", aturi) 762 l.Info("wrote record to PDS") 763 764 tx, err := rp.db.BeginTx(r.Context(), nil) 765 if err != nil { 766 fail("Failed to add collaborator.", err) 767 return 768 } 769 770 rollback := func() { 771 err1 := tx.Rollback() 772 err2 := rp.enforcer.E.LoadPolicy() 773 err3 := rollbackRecord(context.Background(), aturi, client) 774 775 // ignore txn complete errors, this is okay 776 if errors.Is(err1, sql.ErrTxDone) { 777 err1 = nil 778 } 779 780 if errs := errors.Join(err1, err2, err3); errs != nil { 781 l.Error("failed to rollback changes", "errs", errs) 782 return 783 } 784 } 785 defer rollback() 786 787 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.DidSlashRepo()) 788 if err != nil { 789 fail("Failed to add collaborator permissions.", err) 790 return 791 } 792 793 err = db.AddCollaborator(tx, models.Collaborator{ 794 Did: syntax.DID(currentUser.Did), 795 Rkey: rkey, 796 SubjectDid: collaboratorIdent.DID, 797 RepoAt: f.RepoAt(), 798 Created: createdAt, 799 }) 800 if err != nil { 801 fail("Failed to add collaborator.", err) 802 return 803 } 804 805 err = tx.Commit() 806 if err != nil { 807 fail("Failed to add collaborator.", err) 808 return 809 } 810 811 err = rp.enforcer.E.SavePolicy() 812 if err != nil { 813 fail("Failed to update collaborator permissions.", err) 814 return 815 } 816 817 // clear aturi to when everything is successful 818 aturi = "" 819 820 rp.pages.HxRefresh(w) 821} 822 823func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) { 824 user := rp.oauth.GetUser(r) 825 l := rp.logger.With("handler", "DeleteRepo") 826 827 noticeId := "operation-error" 828 f, err := rp.repoResolver.Resolve(r) 829 if err != nil { 830 l.Error("failed to get repo and knot", "err", err) 831 return 832 } 833 834 // remove record from pds 835 atpClient, err := rp.oauth.AuthorizedClient(r) 836 if err != nil { 837 l.Error("failed to get authorized client", "err", err) 838 return 839 } 840 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{ 841 Collection: tangled.RepoNSID, 842 Repo: user.Did, 843 Rkey: f.Rkey, 844 }) 845 if err != nil { 846 l.Error("failed to delete record", "err", err) 847 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.") 848 return 849 } 850 l.Info("removed repo record", "aturi", f.RepoAt().String()) 851 852 client, err := rp.oauth.ServiceClient( 853 r, 854 oauth.WithService(f.Knot), 855 oauth.WithLxm(tangled.RepoDeleteNSID), 856 oauth.WithDev(rp.config.Core.Dev), 857 ) 858 if err != nil { 859 l.Error("failed to connect to knot server", "err", err) 860 return 861 } 862 863 err = tangled.RepoDelete( 864 r.Context(), 865 client, 866 &tangled.RepoDelete_Input{ 867 Did: f.OwnerDid(), 868 Name: f.Name, 869 Rkey: f.Rkey, 870 }, 871 ) 872 if err := xrpcclient.HandleXrpcErr(err); err != nil { 873 rp.pages.Notice(w, noticeId, err.Error()) 874 return 875 } 876 l.Info("deleted repo from knot") 877 878 tx, err := rp.db.BeginTx(r.Context(), nil) 879 if err != nil { 880 l.Error("failed to start tx") 881 w.Write(fmt.Append(nil, "failed to add collaborator: ", err)) 882 return 883 } 884 defer func() { 885 tx.Rollback() 886 err = rp.enforcer.E.LoadPolicy() 887 if err != nil { 888 l.Error("failed to rollback policies") 889 } 890 }() 891 892 // remove collaborator RBAC 893 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.DidSlashRepo(), f.Knot) 894 if err != nil { 895 rp.pages.Notice(w, noticeId, "Failed to remove collaborators") 896 return 897 } 898 for _, c := range repoCollaborators { 899 did := c[0] 900 rp.enforcer.RemoveCollaborator(did, f.Knot, f.DidSlashRepo()) 901 } 902 l.Info("removed collaborators") 903 904 // remove repo RBAC 905 err = rp.enforcer.RemoveRepo(f.OwnerDid(), f.Knot, f.DidSlashRepo()) 906 if err != nil { 907 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules") 908 return 909 } 910 911 // remove repo from db 912 err = db.RemoveRepo(tx, f.OwnerDid(), f.Name) 913 if err != nil { 914 rp.pages.Notice(w, noticeId, "Failed to update appview") 915 return 916 } 917 l.Info("removed repo from db") 918 919 err = tx.Commit() 920 if err != nil { 921 l.Error("failed to commit changes", "err", err) 922 http.Error(w, err.Error(), http.StatusInternalServerError) 923 return 924 } 925 926 err = rp.enforcer.E.SavePolicy() 927 if err != nil { 928 l.Error("failed to update ACLs", "err", err) 929 http.Error(w, err.Error(), http.StatusInternalServerError) 930 return 931 } 932 933 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.OwnerDid())) 934} 935 936func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) { 937 l := rp.logger.With("handler", "SyncRepoFork") 938 939 ref := chi.URLParam(r, "ref") 940 ref, _ = url.PathUnescape(ref) 941 942 user := rp.oauth.GetUser(r) 943 f, err := rp.repoResolver.Resolve(r) 944 if err != nil { 945 l.Error("failed to resolve source repo", "err", err) 946 return 947 } 948 949 switch r.Method { 950 case http.MethodPost: 951 client, err := rp.oauth.ServiceClient( 952 r, 953 oauth.WithService(f.Knot), 954 oauth.WithLxm(tangled.RepoForkSyncNSID), 955 oauth.WithDev(rp.config.Core.Dev), 956 ) 957 if err != nil { 958 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 959 return 960 } 961 962 repoInfo := f.RepoInfo(user) 963 if repoInfo.Source == nil { 964 rp.pages.Notice(w, "repo", "This repository is not a fork.") 965 return 966 } 967 968 err = tangled.RepoForkSync( 969 r.Context(), 970 client, 971 &tangled.RepoForkSync_Input{ 972 Did: user.Did, 973 Name: f.Name, 974 Source: repoInfo.Source.RepoAt().String(), 975 Branch: ref, 976 }, 977 ) 978 if err := xrpcclient.HandleXrpcErr(err); err != nil { 979 rp.pages.Notice(w, "repo", err.Error()) 980 return 981 } 982 983 rp.pages.HxRefresh(w) 984 return 985 } 986} 987 988func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) { 989 l := rp.logger.With("handler", "ForkRepo") 990 991 user := rp.oauth.GetUser(r) 992 f, err := rp.repoResolver.Resolve(r) 993 if err != nil { 994 l.Error("failed to resolve source repo", "err", err) 995 return 996 } 997 998 switch r.Method { 999 case http.MethodGet: 1000 user := rp.oauth.GetUser(r) 1001 knots, err := rp.enforcer.GetKnotsForUser(user.Did) 1002 if err != nil { 1003 rp.pages.Notice(w, "repo", "Invalid user account.") 1004 return 1005 } 1006 1007 rp.pages.ForkRepo(w, pages.ForkRepoParams{ 1008 LoggedInUser: user, 1009 Knots: knots, 1010 RepoInfo: f.RepoInfo(user), 1011 }) 1012 1013 case http.MethodPost: 1014 l := rp.logger.With("handler", "ForkRepo") 1015 1016 targetKnot := r.FormValue("knot") 1017 if targetKnot == "" { 1018 rp.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.") 1019 return 1020 } 1021 l = l.With("targetKnot", targetKnot) 1022 1023 ok, err := rp.enforcer.E.Enforce(user.Did, targetKnot, targetKnot, "repo:create") 1024 if err != nil || !ok { 1025 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 1026 return 1027 } 1028 1029 // choose a name for a fork 1030 forkName := r.FormValue("repo_name") 1031 if forkName == "" { 1032 rp.pages.Notice(w, "repo", "Repository name cannot be empty.") 1033 return 1034 } 1035 1036 // this check is *only* to see if the forked repo name already exists 1037 // in the user's account. 1038 existingRepo, err := db.GetRepo( 1039 rp.db, 1040 db.FilterEq("did", user.Did), 1041 db.FilterEq("name", forkName), 1042 ) 1043 if err != nil { 1044 if !errors.Is(err, sql.ErrNoRows) { 1045 l.Error("error fetching existing repo from db", "err", err) 1046 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.") 1047 return 1048 } 1049 } else if existingRepo != nil { 1050 // repo with this name already exists 1051 rp.pages.Notice(w, "repo", "A repository with this name already exists.") 1052 return 1053 } 1054 l = l.With("forkName", forkName) 1055 1056 uri := "https" 1057 if rp.config.Core.Dev { 1058 uri = "http" 1059 } 1060 1061 forkSourceUrl := fmt.Sprintf("%s://%s/%s/%s", uri, f.Knot, f.OwnerDid(), f.Repo.Name) 1062 l = l.With("cloneUrl", forkSourceUrl) 1063 1064 sourceAt := f.RepoAt().String() 1065 1066 // create an atproto record for this fork 1067 rkey := tid.TID() 1068 repo := &models.Repo{ 1069 Did: user.Did, 1070 Name: forkName, 1071 Knot: targetKnot, 1072 Rkey: rkey, 1073 Source: sourceAt, 1074 Description: f.Repo.Description, 1075 Created: time.Now(), 1076 Labels: rp.config.Label.DefaultLabelDefs, 1077 } 1078 record := repo.AsRecord() 1079 1080 atpClient, err := rp.oauth.AuthorizedClient(r) 1081 if err != nil { 1082 l.Error("failed to create xrpcclient", "err", err) 1083 rp.pages.Notice(w, "repo", "Failed to fork repository.") 1084 return 1085 } 1086 1087 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 1088 Collection: tangled.RepoNSID, 1089 Repo: user.Did, 1090 Rkey: rkey, 1091 Record: &lexutil.LexiconTypeDecoder{ 1092 Val: &record, 1093 }, 1094 }) 1095 if err != nil { 1096 l.Error("failed to write to PDS", "err", err) 1097 rp.pages.Notice(w, "repo", "Failed to announce repository creation.") 1098 return 1099 } 1100 1101 aturi := atresp.Uri 1102 l = l.With("aturi", aturi) 1103 l.Info("wrote to PDS") 1104 1105 tx, err := rp.db.BeginTx(r.Context(), nil) 1106 if err != nil { 1107 l.Info("txn failed", "err", err) 1108 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1109 return 1110 } 1111 1112 // The rollback function reverts a few things on failure: 1113 // - the pending txn 1114 // - the ACLs 1115 // - the atproto record created 1116 rollback := func() { 1117 err1 := tx.Rollback() 1118 err2 := rp.enforcer.E.LoadPolicy() 1119 err3 := rollbackRecord(context.Background(), aturi, atpClient) 1120 1121 // ignore txn complete errors, this is okay 1122 if errors.Is(err1, sql.ErrTxDone) { 1123 err1 = nil 1124 } 1125 1126 if errs := errors.Join(err1, err2, err3); errs != nil { 1127 l.Error("failed to rollback changes", "errs", errs) 1128 return 1129 } 1130 } 1131 defer rollback() 1132 1133 client, err := rp.oauth.ServiceClient( 1134 r, 1135 oauth.WithService(targetKnot), 1136 oauth.WithLxm(tangled.RepoCreateNSID), 1137 oauth.WithDev(rp.config.Core.Dev), 1138 ) 1139 if err != nil { 1140 l.Error("could not create service client", "err", err) 1141 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 1142 return 1143 } 1144 1145 err = tangled.RepoCreate( 1146 r.Context(), 1147 client, 1148 &tangled.RepoCreate_Input{ 1149 Rkey: rkey, 1150 Source: &forkSourceUrl, 1151 }, 1152 ) 1153 if err := xrpcclient.HandleXrpcErr(err); err != nil { 1154 rp.pages.Notice(w, "repo", err.Error()) 1155 return 1156 } 1157 1158 err = db.AddRepo(tx, repo) 1159 if err != nil { 1160 l.Error("failed to AddRepo", "err", err) 1161 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1162 return 1163 } 1164 1165 // acls 1166 p, _ := securejoin.SecureJoin(user.Did, forkName) 1167 err = rp.enforcer.AddRepo(user.Did, targetKnot, p) 1168 if err != nil { 1169 l.Error("failed to add ACLs", "err", err) 1170 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.") 1171 return 1172 } 1173 1174 err = tx.Commit() 1175 if err != nil { 1176 l.Error("failed to commit changes", "err", err) 1177 http.Error(w, err.Error(), http.StatusInternalServerError) 1178 return 1179 } 1180 1181 err = rp.enforcer.E.SavePolicy() 1182 if err != nil { 1183 l.Error("failed to update ACLs", "err", err) 1184 http.Error(w, err.Error(), http.StatusInternalServerError) 1185 return 1186 } 1187 1188 // reset the ATURI because the transaction completed successfully 1189 aturi = "" 1190 1191 rp.notifier.NewRepo(r.Context(), repo) 1192 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Did, forkName)) 1193 } 1194} 1195 1196// this is used to rollback changes made to the PDS 1197// 1198// it is a no-op if the provided ATURI is empty 1199func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 1200 if aturi == "" { 1201 return nil 1202 } 1203 1204 parsed := syntax.ATURI(aturi) 1205 1206 collection := parsed.Collection().String() 1207 repo := parsed.Authority().String() 1208 rkey := parsed.RecordKey().String() 1209 1210 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 1211 Collection: collection, 1212 Repo: repo, 1213 Rkey: rkey, 1214 }) 1215 return err 1216}