forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
at master 30 kB view raw
1package repo 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "net/url" 11 "slices" 12 "strings" 13 "time" 14 15 "tangled.org/core/api/tangled" 16 "tangled.org/core/appview/config" 17 "tangled.org/core/appview/db" 18 "tangled.org/core/appview/models" 19 "tangled.org/core/appview/notify" 20 "tangled.org/core/appview/oauth" 21 "tangled.org/core/appview/pages" 22 "tangled.org/core/appview/reporesolver" 23 "tangled.org/core/appview/validator" 24 xrpcclient "tangled.org/core/appview/xrpcclient" 25 "tangled.org/core/eventconsumer" 26 "tangled.org/core/idresolver" 27 "tangled.org/core/orm" 28 "tangled.org/core/rbac" 29 "tangled.org/core/tid" 30 "tangled.org/core/xrpc/serviceauth" 31 32 comatproto "github.com/bluesky-social/indigo/api/atproto" 33 atpclient "github.com/bluesky-social/indigo/atproto/client" 34 "github.com/bluesky-social/indigo/atproto/syntax" 35 lexutil "github.com/bluesky-social/indigo/lex/util" 36 securejoin "github.com/cyphar/filepath-securejoin" 37 "github.com/go-chi/chi/v5" 38) 39 40type Repo struct { 41 repoResolver *reporesolver.RepoResolver 42 idResolver *idresolver.Resolver 43 config *config.Config 44 oauth *oauth.OAuth 45 pages *pages.Pages 46 spindlestream *eventconsumer.Consumer 47 db *db.DB 48 enforcer *rbac.Enforcer 49 notifier notify.Notifier 50 logger *slog.Logger 51 serviceAuth *serviceauth.ServiceAuth 52 validator *validator.Validator 53} 54 55func New( 56 oauth *oauth.OAuth, 57 repoResolver *reporesolver.RepoResolver, 58 pages *pages.Pages, 59 spindlestream *eventconsumer.Consumer, 60 idResolver *idresolver.Resolver, 61 db *db.DB, 62 config *config.Config, 63 notifier notify.Notifier, 64 enforcer *rbac.Enforcer, 65 logger *slog.Logger, 66 validator *validator.Validator, 67) *Repo { 68 return &Repo{oauth: oauth, 69 repoResolver: repoResolver, 70 pages: pages, 71 idResolver: idResolver, 72 config: config, 73 spindlestream: spindlestream, 74 db: db, 75 notifier: notifier, 76 enforcer: enforcer, 77 logger: logger, 78 validator: validator, 79 } 80} 81 82// modify the spindle configured for this repo 83func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) { 84 user := rp.oauth.GetUser(r) 85 l := rp.logger.With("handler", "EditSpindle") 86 l = l.With("did", user.Did) 87 88 errorId := "operation-error" 89 fail := func(msg string, err error) { 90 l.Error(msg, "err", err) 91 rp.pages.Notice(w, errorId, msg) 92 } 93 94 f, err := rp.repoResolver.Resolve(r) 95 if err != nil { 96 fail("Failed to resolve repo. Try again later", err) 97 return 98 } 99 100 newSpindle := r.FormValue("spindle") 101 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value 102 client, err := rp.oauth.AuthorizedClient(r) 103 if err != nil { 104 fail("Failed to authorize. Try again later.", err) 105 return 106 } 107 108 if !removingSpindle { 109 // ensure that this is a valid spindle for this user 110 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Did) 111 if err != nil { 112 fail("Failed to find spindles. Try again later.", err) 113 return 114 } 115 116 if !slices.Contains(validSpindles, newSpindle) { 117 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles)) 118 return 119 } 120 } 121 122 newRepo := *f 123 newRepo.Spindle = newSpindle 124 record := newRepo.AsRecord() 125 126 spindlePtr := &newSpindle 127 if removingSpindle { 128 spindlePtr = nil 129 newRepo.Spindle = "" 130 } 131 132 // optimistic update 133 err = db.UpdateSpindle(rp.db, newRepo.RepoAt().String(), spindlePtr) 134 if err != nil { 135 fail("Failed to update spindle. Try again later.", err) 136 return 137 } 138 139 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 140 if err != nil { 141 fail("Failed to update spindle, no record found on PDS.", err) 142 return 143 } 144 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 145 Collection: tangled.RepoNSID, 146 Repo: newRepo.Did, 147 Rkey: newRepo.Rkey, 148 SwapRecord: ex.Cid, 149 Record: &lexutil.LexiconTypeDecoder{ 150 Val: &record, 151 }, 152 }) 153 154 if err != nil { 155 fail("Failed to update spindle, unable to save to PDS.", err) 156 return 157 } 158 159 if !removingSpindle { 160 // add this spindle to spindle stream 161 rp.spindlestream.AddSource( 162 context.Background(), 163 eventconsumer.NewSpindleSource(newSpindle), 164 ) 165 } 166 167 rp.pages.HxRefresh(w) 168} 169 170func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) { 171 user := rp.oauth.GetUser(r) 172 l := rp.logger.With("handler", "AddLabel") 173 l = l.With("did", user.Did) 174 175 f, err := rp.repoResolver.Resolve(r) 176 if err != nil { 177 l.Error("failed to get repo and knot", "err", err) 178 return 179 } 180 181 errorId := "add-label-error" 182 fail := func(msg string, err error) { 183 l.Error(msg, "err", err) 184 rp.pages.Notice(w, errorId, msg) 185 } 186 187 // get form values for label definition 188 name := r.FormValue("name") 189 concreteType := r.FormValue("valueType") 190 valueFormat := r.FormValue("valueFormat") 191 enumValues := r.FormValue("enumValues") 192 scope := r.Form["scope"] 193 color := r.FormValue("color") 194 multiple := r.FormValue("multiple") == "true" 195 196 var variants []string 197 for part := range strings.SplitSeq(enumValues, ",") { 198 if part = strings.TrimSpace(part); part != "" { 199 variants = append(variants, part) 200 } 201 } 202 203 if concreteType == "" { 204 concreteType = "null" 205 } 206 207 format := models.ValueTypeFormatAny 208 if valueFormat == "did" { 209 format = models.ValueTypeFormatDid 210 } 211 212 valueType := models.ValueType{ 213 Type: models.ConcreteType(concreteType), 214 Format: format, 215 Enum: variants, 216 } 217 218 label := models.LabelDefinition{ 219 Did: user.Did, 220 Rkey: tid.TID(), 221 Name: name, 222 ValueType: valueType, 223 Scope: scope, 224 Color: &color, 225 Multiple: multiple, 226 Created: time.Now(), 227 } 228 if err := rp.validator.ValidateLabelDefinition(&label); err != nil { 229 fail(err.Error(), err) 230 return 231 } 232 233 // announce this relation into the firehose, store into owners' pds 234 client, err := rp.oauth.AuthorizedClient(r) 235 if err != nil { 236 fail(err.Error(), err) 237 return 238 } 239 240 // emit a labelRecord 241 labelRecord := label.AsRecord() 242 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 243 Collection: tangled.LabelDefinitionNSID, 244 Repo: label.Did, 245 Rkey: label.Rkey, 246 Record: &lexutil.LexiconTypeDecoder{ 247 Val: &labelRecord, 248 }, 249 }) 250 // invalid record 251 if err != nil { 252 fail("Failed to write record to PDS.", err) 253 return 254 } 255 256 aturi := resp.Uri 257 l = l.With("at-uri", aturi) 258 l.Info("wrote label record to PDS") 259 260 // update the repo to subscribe to this label 261 newRepo := *f 262 newRepo.Labels = append(newRepo.Labels, aturi) 263 repoRecord := newRepo.AsRecord() 264 265 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 266 if err != nil { 267 fail("Failed to update labels, no record found on PDS.", err) 268 return 269 } 270 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 271 Collection: tangled.RepoNSID, 272 Repo: newRepo.Did, 273 Rkey: newRepo.Rkey, 274 SwapRecord: ex.Cid, 275 Record: &lexutil.LexiconTypeDecoder{ 276 Val: &repoRecord, 277 }, 278 }) 279 if err != nil { 280 fail("Failed to update labels for repo.", err) 281 return 282 } 283 284 tx, err := rp.db.BeginTx(r.Context(), nil) 285 if err != nil { 286 fail("Failed to add label.", err) 287 return 288 } 289 290 rollback := func() { 291 err1 := tx.Rollback() 292 err2 := rollbackRecord(context.Background(), aturi, client) 293 294 // ignore txn complete errors, this is okay 295 if errors.Is(err1, sql.ErrTxDone) { 296 err1 = nil 297 } 298 299 if errs := errors.Join(err1, err2); errs != nil { 300 l.Error("failed to rollback changes", "errs", errs) 301 return 302 } 303 } 304 defer rollback() 305 306 _, err = db.AddLabelDefinition(tx, &label) 307 if err != nil { 308 fail("Failed to add label.", err) 309 return 310 } 311 312 err = db.SubscribeLabel(tx, &models.RepoLabel{ 313 RepoAt: f.RepoAt(), 314 LabelAt: label.AtUri(), 315 }) 316 317 err = tx.Commit() 318 if err != nil { 319 fail("Failed to add label.", err) 320 return 321 } 322 323 // clear aturi when everything is successful 324 aturi = "" 325 326 rp.pages.HxRefresh(w) 327} 328 329func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) { 330 user := rp.oauth.GetUser(r) 331 l := rp.logger.With("handler", "DeleteLabel") 332 l = l.With("did", user.Did) 333 334 f, err := rp.repoResolver.Resolve(r) 335 if err != nil { 336 l.Error("failed to get repo and knot", "err", err) 337 return 338 } 339 340 errorId := "label-operation" 341 fail := func(msg string, err error) { 342 l.Error(msg, "err", err) 343 rp.pages.Notice(w, errorId, msg) 344 } 345 346 // get form values 347 labelId := r.FormValue("label-id") 348 349 label, err := db.GetLabelDefinition(rp.db, orm.FilterEq("id", labelId)) 350 if err != nil { 351 fail("Failed to find label definition.", err) 352 return 353 } 354 355 client, err := rp.oauth.AuthorizedClient(r) 356 if err != nil { 357 fail(err.Error(), err) 358 return 359 } 360 361 // delete label record from PDS 362 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 363 Collection: tangled.LabelDefinitionNSID, 364 Repo: label.Did, 365 Rkey: label.Rkey, 366 }) 367 if err != nil { 368 fail("Failed to delete label record from PDS.", err) 369 return 370 } 371 372 // update repo record to remove the label reference 373 newRepo := *f 374 var updated []string 375 removedAt := label.AtUri().String() 376 for _, l := range newRepo.Labels { 377 if l != removedAt { 378 updated = append(updated, l) 379 } 380 } 381 newRepo.Labels = updated 382 repoRecord := newRepo.AsRecord() 383 384 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 385 if err != nil { 386 fail("Failed to update labels, no record found on PDS.", err) 387 return 388 } 389 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 390 Collection: tangled.RepoNSID, 391 Repo: newRepo.Did, 392 Rkey: newRepo.Rkey, 393 SwapRecord: ex.Cid, 394 Record: &lexutil.LexiconTypeDecoder{ 395 Val: &repoRecord, 396 }, 397 }) 398 if err != nil { 399 fail("Failed to update repo record.", err) 400 return 401 } 402 403 // transaction for DB changes 404 tx, err := rp.db.BeginTx(r.Context(), nil) 405 if err != nil { 406 fail("Failed to delete label.", err) 407 return 408 } 409 defer tx.Rollback() 410 411 err = db.UnsubscribeLabel( 412 tx, 413 orm.FilterEq("repo_at", f.RepoAt()), 414 orm.FilterEq("label_at", removedAt), 415 ) 416 if err != nil { 417 fail("Failed to unsubscribe label.", err) 418 return 419 } 420 421 err = db.DeleteLabelDefinition(tx, orm.FilterEq("id", label.Id)) 422 if err != nil { 423 fail("Failed to delete label definition.", err) 424 return 425 } 426 427 err = tx.Commit() 428 if err != nil { 429 fail("Failed to delete label.", err) 430 return 431 } 432 433 // everything succeeded 434 rp.pages.HxRefresh(w) 435} 436 437func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) { 438 user := rp.oauth.GetUser(r) 439 l := rp.logger.With("handler", "SubscribeLabel") 440 l = l.With("did", user.Did) 441 442 f, err := rp.repoResolver.Resolve(r) 443 if err != nil { 444 l.Error("failed to get repo and knot", "err", err) 445 return 446 } 447 448 if err := r.ParseForm(); err != nil { 449 l.Error("invalid form", "err", err) 450 return 451 } 452 453 errorId := "default-label-operation" 454 fail := func(msg string, err error) { 455 l.Error(msg, "err", err) 456 rp.pages.Notice(w, errorId, msg) 457 } 458 459 labelAts := r.Form["label"] 460 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts)) 461 if err != nil { 462 fail("Failed to subscribe to label.", err) 463 return 464 } 465 466 newRepo := *f 467 newRepo.Labels = append(newRepo.Labels, labelAts...) 468 469 // dedup 470 slices.Sort(newRepo.Labels) 471 newRepo.Labels = slices.Compact(newRepo.Labels) 472 473 repoRecord := newRepo.AsRecord() 474 475 client, err := rp.oauth.AuthorizedClient(r) 476 if err != nil { 477 fail(err.Error(), err) 478 return 479 } 480 481 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey) 482 if err != nil { 483 fail("Failed to update labels, no record found on PDS.", err) 484 return 485 } 486 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 487 Collection: tangled.RepoNSID, 488 Repo: newRepo.Did, 489 Rkey: newRepo.Rkey, 490 SwapRecord: ex.Cid, 491 Record: &lexutil.LexiconTypeDecoder{ 492 Val: &repoRecord, 493 }, 494 }) 495 496 tx, err := rp.db.Begin() 497 if err != nil { 498 fail("Failed to subscribe to label.", err) 499 return 500 } 501 defer tx.Rollback() 502 503 for _, l := range labelAts { 504 err = db.SubscribeLabel(tx, &models.RepoLabel{ 505 RepoAt: f.RepoAt(), 506 LabelAt: syntax.ATURI(l), 507 }) 508 if err != nil { 509 fail("Failed to subscribe to label.", err) 510 return 511 } 512 } 513 514 if err := tx.Commit(); err != nil { 515 fail("Failed to subscribe to label.", err) 516 return 517 } 518 519 // everything succeeded 520 rp.pages.HxRefresh(w) 521} 522 523func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) { 524 user := rp.oauth.GetUser(r) 525 l := rp.logger.With("handler", "UnsubscribeLabel") 526 l = l.With("did", user.Did) 527 528 f, err := rp.repoResolver.Resolve(r) 529 if err != nil { 530 l.Error("failed to get repo and knot", "err", err) 531 return 532 } 533 534 if err := r.ParseForm(); err != nil { 535 l.Error("invalid form", "err", err) 536 return 537 } 538 539 errorId := "default-label-operation" 540 fail := func(msg string, err error) { 541 l.Error(msg, "err", err) 542 rp.pages.Notice(w, errorId, msg) 543 } 544 545 labelAts := r.Form["label"] 546 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts)) 547 if err != nil { 548 fail("Failed to unsubscribe to label.", err) 549 return 550 } 551 552 // update repo record to remove the label reference 553 newRepo := *f 554 var updated []string 555 for _, l := range newRepo.Labels { 556 if !slices.Contains(labelAts, l) { 557 updated = append(updated, l) 558 } 559 } 560 newRepo.Labels = updated 561 repoRecord := newRepo.AsRecord() 562 563 client, err := rp.oauth.AuthorizedClient(r) 564 if err != nil { 565 fail(err.Error(), err) 566 return 567 } 568 569 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey) 570 if err != nil { 571 fail("Failed to update labels, no record found on PDS.", err) 572 return 573 } 574 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 575 Collection: tangled.RepoNSID, 576 Repo: newRepo.Did, 577 Rkey: newRepo.Rkey, 578 SwapRecord: ex.Cid, 579 Record: &lexutil.LexiconTypeDecoder{ 580 Val: &repoRecord, 581 }, 582 }) 583 584 err = db.UnsubscribeLabel( 585 rp.db, 586 orm.FilterEq("repo_at", f.RepoAt()), 587 orm.FilterIn("label_at", labelAts), 588 ) 589 if err != nil { 590 fail("Failed to unsubscribe label.", err) 591 return 592 } 593 594 // everything succeeded 595 rp.pages.HxRefresh(w) 596} 597 598func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) { 599 l := rp.logger.With("handler", "LabelPanel") 600 601 f, err := rp.repoResolver.Resolve(r) 602 if err != nil { 603 l.Error("failed to get repo and knot", "err", err) 604 return 605 } 606 607 subjectStr := r.FormValue("subject") 608 subject, err := syntax.ParseATURI(subjectStr) 609 if err != nil { 610 l.Error("failed to get repo and knot", "err", err) 611 return 612 } 613 614 labelDefs, err := db.GetLabelDefinitions( 615 rp.db, 616 orm.FilterIn("at_uri", f.Labels), 617 orm.FilterContains("scope", subject.Collection().String()), 618 ) 619 if err != nil { 620 l.Error("failed to fetch label defs", "err", err) 621 return 622 } 623 624 defs := make(map[string]*models.LabelDefinition) 625 for _, l := range labelDefs { 626 defs[l.AtUri().String()] = &l 627 } 628 629 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject)) 630 if err != nil { 631 l.Error("failed to build label state", "err", err) 632 return 633 } 634 state := states[subject] 635 636 user := rp.oauth.GetUser(r) 637 rp.pages.LabelPanel(w, pages.LabelPanelParams{ 638 LoggedInUser: user, 639 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 640 Defs: defs, 641 Subject: subject.String(), 642 State: state, 643 }) 644} 645 646func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) { 647 l := rp.logger.With("handler", "EditLabelPanel") 648 649 f, err := rp.repoResolver.Resolve(r) 650 if err != nil { 651 l.Error("failed to get repo and knot", "err", err) 652 return 653 } 654 655 subjectStr := r.FormValue("subject") 656 subject, err := syntax.ParseATURI(subjectStr) 657 if err != nil { 658 l.Error("failed to get repo and knot", "err", err) 659 return 660 } 661 662 labelDefs, err := db.GetLabelDefinitions( 663 rp.db, 664 orm.FilterIn("at_uri", f.Labels), 665 orm.FilterContains("scope", subject.Collection().String()), 666 ) 667 if err != nil { 668 l.Error("failed to fetch labels", "err", err) 669 return 670 } 671 672 defs := make(map[string]*models.LabelDefinition) 673 for _, l := range labelDefs { 674 defs[l.AtUri().String()] = &l 675 } 676 677 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject)) 678 if err != nil { 679 l.Error("failed to build label state", "err", err) 680 return 681 } 682 state := states[subject] 683 684 user := rp.oauth.GetUser(r) 685 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{ 686 LoggedInUser: user, 687 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 688 Defs: defs, 689 Subject: subject.String(), 690 State: state, 691 }) 692} 693 694func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) { 695 user := rp.oauth.GetUser(r) 696 l := rp.logger.With("handler", "AddCollaborator") 697 l = l.With("did", user.Did) 698 699 f, err := rp.repoResolver.Resolve(r) 700 if err != nil { 701 l.Error("failed to get repo and knot", "err", err) 702 return 703 } 704 705 errorId := "add-collaborator-error" 706 fail := func(msg string, err error) { 707 l.Error(msg, "err", err) 708 rp.pages.Notice(w, errorId, msg) 709 } 710 711 collaborator := r.FormValue("collaborator") 712 if collaborator == "" { 713 fail("Invalid form.", nil) 714 return 715 } 716 717 // remove a single leading `@`, to make @handle work with ResolveIdent 718 collaborator = strings.TrimPrefix(collaborator, "@") 719 720 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator) 721 if err != nil { 722 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err) 723 return 724 } 725 726 if collaboratorIdent.DID.String() == user.Did { 727 fail("You seem to be adding yourself as a collaborator.", nil) 728 return 729 } 730 l = l.With("collaborator", collaboratorIdent.Handle) 731 l = l.With("knot", f.Knot) 732 733 // announce this relation into the firehose, store into owners' pds 734 client, err := rp.oauth.AuthorizedClient(r) 735 if err != nil { 736 fail("Failed to write to PDS.", err) 737 return 738 } 739 740 // emit a record 741 currentUser := rp.oauth.GetUser(r) 742 rkey := tid.TID() 743 createdAt := time.Now() 744 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 745 Collection: tangled.RepoCollaboratorNSID, 746 Repo: currentUser.Did, 747 Rkey: rkey, 748 Record: &lexutil.LexiconTypeDecoder{ 749 Val: &tangled.RepoCollaborator{ 750 Subject: collaboratorIdent.DID.String(), 751 Repo: string(f.RepoAt()), 752 CreatedAt: createdAt.Format(time.RFC3339), 753 }}, 754 }) 755 // invalid record 756 if err != nil { 757 fail("Failed to write record to PDS.", err) 758 return 759 } 760 761 aturi := resp.Uri 762 l = l.With("at-uri", aturi) 763 l.Info("wrote record to PDS") 764 765 tx, err := rp.db.BeginTx(r.Context(), nil) 766 if err != nil { 767 fail("Failed to add collaborator.", err) 768 return 769 } 770 771 rollback := func() { 772 err1 := tx.Rollback() 773 err2 := rp.enforcer.E.LoadPolicy() 774 err3 := rollbackRecord(context.Background(), aturi, client) 775 776 // ignore txn complete errors, this is okay 777 if errors.Is(err1, sql.ErrTxDone) { 778 err1 = nil 779 } 780 781 if errs := errors.Join(err1, err2, err3); errs != nil { 782 l.Error("failed to rollback changes", "errs", errs) 783 return 784 } 785 } 786 defer rollback() 787 788 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.DidSlashRepo()) 789 if err != nil { 790 fail("Failed to add collaborator permissions.", err) 791 return 792 } 793 794 err = db.AddCollaborator(tx, models.Collaborator{ 795 Did: syntax.DID(currentUser.Did), 796 Rkey: rkey, 797 SubjectDid: collaboratorIdent.DID, 798 RepoAt: f.RepoAt(), 799 Created: createdAt, 800 }) 801 if err != nil { 802 fail("Failed to add collaborator.", err) 803 return 804 } 805 806 err = tx.Commit() 807 if err != nil { 808 fail("Failed to add collaborator.", err) 809 return 810 } 811 812 err = rp.enforcer.E.SavePolicy() 813 if err != nil { 814 fail("Failed to update collaborator permissions.", err) 815 return 816 } 817 818 // clear aturi to when everything is successful 819 aturi = "" 820 821 rp.pages.HxRefresh(w) 822} 823 824func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) { 825 user := rp.oauth.GetUser(r) 826 l := rp.logger.With("handler", "DeleteRepo") 827 828 noticeId := "operation-error" 829 f, err := rp.repoResolver.Resolve(r) 830 if err != nil { 831 l.Error("failed to get repo and knot", "err", err) 832 return 833 } 834 835 // remove record from pds 836 atpClient, err := rp.oauth.AuthorizedClient(r) 837 if err != nil { 838 l.Error("failed to get authorized client", "err", err) 839 return 840 } 841 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{ 842 Collection: tangled.RepoNSID, 843 Repo: user.Did, 844 Rkey: f.Rkey, 845 }) 846 if err != nil { 847 l.Error("failed to delete record", "err", err) 848 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.") 849 return 850 } 851 l.Info("removed repo record", "aturi", f.RepoAt().String()) 852 853 client, err := rp.oauth.ServiceClient( 854 r, 855 oauth.WithService(f.Knot), 856 oauth.WithLxm(tangled.RepoDeleteNSID), 857 oauth.WithDev(rp.config.Core.Dev), 858 ) 859 if err != nil { 860 l.Error("failed to connect to knot server", "err", err) 861 return 862 } 863 864 err = tangled.RepoDelete( 865 r.Context(), 866 client, 867 &tangled.RepoDelete_Input{ 868 Did: f.Did, 869 Name: f.Name, 870 Rkey: f.Rkey, 871 }, 872 ) 873 if err := xrpcclient.HandleXrpcErr(err); err != nil { 874 rp.pages.Notice(w, noticeId, err.Error()) 875 return 876 } 877 l.Info("deleted repo from knot") 878 879 tx, err := rp.db.BeginTx(r.Context(), nil) 880 if err != nil { 881 l.Error("failed to start tx") 882 w.Write(fmt.Append(nil, "failed to add collaborator: ", err)) 883 return 884 } 885 defer func() { 886 tx.Rollback() 887 err = rp.enforcer.E.LoadPolicy() 888 if err != nil { 889 l.Error("failed to rollback policies") 890 } 891 }() 892 893 // remove collaborator RBAC 894 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.DidSlashRepo(), f.Knot) 895 if err != nil { 896 rp.pages.Notice(w, noticeId, "Failed to remove collaborators") 897 return 898 } 899 for _, c := range repoCollaborators { 900 did := c[0] 901 rp.enforcer.RemoveCollaborator(did, f.Knot, f.DidSlashRepo()) 902 } 903 l.Info("removed collaborators") 904 905 // remove repo RBAC 906 err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.DidSlashRepo()) 907 if err != nil { 908 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules") 909 return 910 } 911 912 // remove repo from db 913 err = db.RemoveRepo(tx, f.Did, f.Name) 914 if err != nil { 915 rp.pages.Notice(w, noticeId, "Failed to update appview") 916 return 917 } 918 l.Info("removed repo from db") 919 920 err = tx.Commit() 921 if err != nil { 922 l.Error("failed to commit changes", "err", err) 923 http.Error(w, err.Error(), http.StatusInternalServerError) 924 return 925 } 926 927 err = rp.enforcer.E.SavePolicy() 928 if err != nil { 929 l.Error("failed to update ACLs", "err", err) 930 http.Error(w, err.Error(), http.StatusInternalServerError) 931 return 932 } 933 934 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.Did)) 935} 936 937func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) { 938 l := rp.logger.With("handler", "SyncRepoFork") 939 940 ref := chi.URLParam(r, "ref") 941 ref, _ = url.PathUnescape(ref) 942 943 user := rp.oauth.GetUser(r) 944 f, err := rp.repoResolver.Resolve(r) 945 if err != nil { 946 l.Error("failed to resolve source repo", "err", err) 947 return 948 } 949 950 switch r.Method { 951 case http.MethodPost: 952 client, err := rp.oauth.ServiceClient( 953 r, 954 oauth.WithService(f.Knot), 955 oauth.WithLxm(tangled.RepoForkSyncNSID), 956 oauth.WithDev(rp.config.Core.Dev), 957 ) 958 if err != nil { 959 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 960 return 961 } 962 963 if f.Source == "" { 964 rp.pages.Notice(w, "repo", "This repository is not a fork.") 965 return 966 } 967 968 err = tangled.RepoForkSync( 969 r.Context(), 970 client, 971 &tangled.RepoForkSync_Input{ 972 Did: user.Did, 973 Name: f.Name, 974 Source: f.Source, 975 Branch: ref, 976 }, 977 ) 978 if err := xrpcclient.HandleXrpcErr(err); err != nil { 979 rp.pages.Notice(w, "repo", err.Error()) 980 return 981 } 982 983 rp.pages.HxRefresh(w) 984 return 985 } 986} 987 988func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) { 989 l := rp.logger.With("handler", "ForkRepo") 990 991 user := rp.oauth.GetUser(r) 992 f, err := rp.repoResolver.Resolve(r) 993 if err != nil { 994 l.Error("failed to resolve source repo", "err", err) 995 return 996 } 997 998 switch r.Method { 999 case http.MethodGet: 1000 user := rp.oauth.GetUser(r) 1001 knots, err := rp.enforcer.GetKnotsForUser(user.Did) 1002 if err != nil { 1003 rp.pages.Notice(w, "repo", "Invalid user account.") 1004 return 1005 } 1006 1007 rp.pages.ForkRepo(w, pages.ForkRepoParams{ 1008 LoggedInUser: user, 1009 Knots: knots, 1010 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 1011 }) 1012 1013 case http.MethodPost: 1014 l := rp.logger.With("handler", "ForkRepo") 1015 1016 targetKnot := r.FormValue("knot") 1017 if targetKnot == "" { 1018 rp.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.") 1019 return 1020 } 1021 l = l.With("targetKnot", targetKnot) 1022 1023 ok, err := rp.enforcer.E.Enforce(user.Did, targetKnot, targetKnot, "repo:create") 1024 if err != nil || !ok { 1025 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 1026 return 1027 } 1028 1029 // choose a name for a fork 1030 forkName := r.FormValue("repo_name") 1031 if forkName == "" { 1032 rp.pages.Notice(w, "repo", "Repository name cannot be empty.") 1033 return 1034 } 1035 1036 // this check is *only* to see if the forked repo name already exists 1037 // in the user's account. 1038 existingRepo, err := db.GetRepo( 1039 rp.db, 1040 orm.FilterEq("did", user.Did), 1041 orm.FilterEq("name", forkName), 1042 ) 1043 if err != nil { 1044 if !errors.Is(err, sql.ErrNoRows) { 1045 l.Error("error fetching existing repo from db", "err", err) 1046 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.") 1047 return 1048 } 1049 } else if existingRepo != nil { 1050 // repo with this name already exists 1051 rp.pages.Notice(w, "repo", "A repository with this name already exists.") 1052 return 1053 } 1054 l = l.With("forkName", forkName) 1055 1056 uri := "https" 1057 if rp.config.Core.Dev { 1058 uri = "http" 1059 } 1060 1061 forkSourceUrl := fmt.Sprintf("%s://%s/%s/%s", uri, f.Knot, f.Did, f.Name) 1062 l = l.With("cloneUrl", forkSourceUrl) 1063 1064 sourceAt := f.RepoAt().String() 1065 1066 // create an atproto record for this fork 1067 rkey := tid.TID() 1068 repo := &models.Repo{ 1069 Did: user.Did, 1070 Name: forkName, 1071 Knot: targetKnot, 1072 Rkey: rkey, 1073 Source: sourceAt, 1074 Description: f.Description, 1075 Created: time.Now(), 1076 Labels: rp.config.Label.DefaultLabelDefs, 1077 } 1078 record := repo.AsRecord() 1079 1080 atpClient, err := rp.oauth.AuthorizedClient(r) 1081 if err != nil { 1082 l.Error("failed to create xrpcclient", "err", err) 1083 rp.pages.Notice(w, "repo", "Failed to fork repository.") 1084 return 1085 } 1086 1087 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 1088 Collection: tangled.RepoNSID, 1089 Repo: user.Did, 1090 Rkey: rkey, 1091 Record: &lexutil.LexiconTypeDecoder{ 1092 Val: &record, 1093 }, 1094 }) 1095 if err != nil { 1096 l.Error("failed to write to PDS", "err", err) 1097 rp.pages.Notice(w, "repo", "Failed to announce repository creation.") 1098 return 1099 } 1100 1101 aturi := atresp.Uri 1102 l = l.With("aturi", aturi) 1103 l.Info("wrote to PDS") 1104 1105 tx, err := rp.db.BeginTx(r.Context(), nil) 1106 if err != nil { 1107 l.Info("txn failed", "err", err) 1108 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1109 return 1110 } 1111 1112 // The rollback function reverts a few things on failure: 1113 // - the pending txn 1114 // - the ACLs 1115 // - the atproto record created 1116 rollback := func() { 1117 err1 := tx.Rollback() 1118 err2 := rp.enforcer.E.LoadPolicy() 1119 err3 := rollbackRecord(context.Background(), aturi, atpClient) 1120 1121 // ignore txn complete errors, this is okay 1122 if errors.Is(err1, sql.ErrTxDone) { 1123 err1 = nil 1124 } 1125 1126 if errs := errors.Join(err1, err2, err3); errs != nil { 1127 l.Error("failed to rollback changes", "errs", errs) 1128 return 1129 } 1130 } 1131 defer rollback() 1132 1133 // TODO: this could coordinate better with the knot to recieve a clone status 1134 client, err := rp.oauth.ServiceClient( 1135 r, 1136 oauth.WithService(targetKnot), 1137 oauth.WithLxm(tangled.RepoCreateNSID), 1138 oauth.WithDev(rp.config.Core.Dev), 1139 oauth.WithTimeout(time.Second*20), // big repos take time to clone 1140 ) 1141 if err != nil { 1142 l.Error("could not create service client", "err", err) 1143 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 1144 return 1145 } 1146 1147 err = tangled.RepoCreate( 1148 r.Context(), 1149 client, 1150 &tangled.RepoCreate_Input{ 1151 Rkey: rkey, 1152 Source: &forkSourceUrl, 1153 }, 1154 ) 1155 if err := xrpcclient.HandleXrpcErr(err); err != nil { 1156 rp.pages.Notice(w, "repo", err.Error()) 1157 return 1158 } 1159 1160 err = db.AddRepo(tx, repo) 1161 if err != nil { 1162 l.Error("failed to AddRepo", "err", err) 1163 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1164 return 1165 } 1166 1167 // acls 1168 p, _ := securejoin.SecureJoin(user.Did, forkName) 1169 err = rp.enforcer.AddRepo(user.Did, targetKnot, p) 1170 if err != nil { 1171 l.Error("failed to add ACLs", "err", err) 1172 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.") 1173 return 1174 } 1175 1176 err = tx.Commit() 1177 if err != nil { 1178 l.Error("failed to commit changes", "err", err) 1179 http.Error(w, err.Error(), http.StatusInternalServerError) 1180 return 1181 } 1182 1183 err = rp.enforcer.E.SavePolicy() 1184 if err != nil { 1185 l.Error("failed to update ACLs", "err", err) 1186 http.Error(w, err.Error(), http.StatusInternalServerError) 1187 return 1188 } 1189 1190 // reset the ATURI because the transaction completed successfully 1191 aturi = "" 1192 1193 rp.notifier.NewRepo(r.Context(), repo) 1194 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Did, forkName)) 1195 } 1196} 1197 1198// this is used to rollback changes made to the PDS 1199// 1200// it is a no-op if the provided ATURI is empty 1201func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 1202 if aturi == "" { 1203 return nil 1204 } 1205 1206 parsed := syntax.ATURI(aturi) 1207 1208 collection := parsed.Collection().String() 1209 repo := parsed.Authority().String() 1210 rkey := parsed.RecordKey().String() 1211 1212 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 1213 Collection: collection, 1214 Repo: repo, 1215 Rkey: rkey, 1216 }) 1217 return err 1218}