forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package repo
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "net/url"
11 "slices"
12 "strings"
13 "time"
14
15 "tangled.org/core/api/tangled"
16 "tangled.org/core/appview/config"
17 "tangled.org/core/appview/db"
18 "tangled.org/core/appview/models"
19 "tangled.org/core/appview/notify"
20 "tangled.org/core/appview/oauth"
21 "tangled.org/core/appview/pages"
22 "tangled.org/core/appview/reporesolver"
23 "tangled.org/core/appview/validator"
24 xrpcclient "tangled.org/core/appview/xrpcclient"
25 "tangled.org/core/eventconsumer"
26 "tangled.org/core/idresolver"
27 "tangled.org/core/orm"
28 "tangled.org/core/rbac"
29 "tangled.org/core/tid"
30 "tangled.org/core/xrpc/serviceauth"
31
32 comatproto "github.com/bluesky-social/indigo/api/atproto"
33 atpclient "github.com/bluesky-social/indigo/atproto/client"
34 "github.com/bluesky-social/indigo/atproto/syntax"
35 lexutil "github.com/bluesky-social/indigo/lex/util"
36 securejoin "github.com/cyphar/filepath-securejoin"
37 "github.com/go-chi/chi/v5"
38)
39
40type Repo struct {
41 repoResolver *reporesolver.RepoResolver
42 idResolver *idresolver.Resolver
43 config *config.Config
44 oauth *oauth.OAuth
45 pages *pages.Pages
46 spindlestream *eventconsumer.Consumer
47 db *db.DB
48 enforcer *rbac.Enforcer
49 notifier notify.Notifier
50 logger *slog.Logger
51 serviceAuth *serviceauth.ServiceAuth
52 validator *validator.Validator
53}
54
55func New(
56 oauth *oauth.OAuth,
57 repoResolver *reporesolver.RepoResolver,
58 pages *pages.Pages,
59 spindlestream *eventconsumer.Consumer,
60 idResolver *idresolver.Resolver,
61 db *db.DB,
62 config *config.Config,
63 notifier notify.Notifier,
64 enforcer *rbac.Enforcer,
65 logger *slog.Logger,
66 validator *validator.Validator,
67) *Repo {
68 return &Repo{oauth: oauth,
69 repoResolver: repoResolver,
70 pages: pages,
71 idResolver: idResolver,
72 config: config,
73 spindlestream: spindlestream,
74 db: db,
75 notifier: notifier,
76 enforcer: enforcer,
77 logger: logger,
78 validator: validator,
79 }
80}
81
82// modify the spindle configured for this repo
83func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) {
84 user := rp.oauth.GetUser(r)
85 l := rp.logger.With("handler", "EditSpindle")
86 l = l.With("did", user.Did)
87
88 errorId := "operation-error"
89 fail := func(msg string, err error) {
90 l.Error(msg, "err", err)
91 rp.pages.Notice(w, errorId, msg)
92 }
93
94 f, err := rp.repoResolver.Resolve(r)
95 if err != nil {
96 fail("Failed to resolve repo. Try again later", err)
97 return
98 }
99
100 newSpindle := r.FormValue("spindle")
101 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value
102 client, err := rp.oauth.AuthorizedClient(r)
103 if err != nil {
104 fail("Failed to authorize. Try again later.", err)
105 return
106 }
107
108 if !removingSpindle {
109 // ensure that this is a valid spindle for this user
110 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Did)
111 if err != nil {
112 fail("Failed to find spindles. Try again later.", err)
113 return
114 }
115
116 if !slices.Contains(validSpindles, newSpindle) {
117 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles))
118 return
119 }
120 }
121
122 newRepo := *f
123 newRepo.Spindle = newSpindle
124 record := newRepo.AsRecord()
125
126 spindlePtr := &newSpindle
127 if removingSpindle {
128 spindlePtr = nil
129 newRepo.Spindle = ""
130 }
131
132 // optimistic update
133 err = db.UpdateSpindle(rp.db, newRepo.RepoAt().String(), spindlePtr)
134 if err != nil {
135 fail("Failed to update spindle. Try again later.", err)
136 return
137 }
138
139 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
140 if err != nil {
141 fail("Failed to update spindle, no record found on PDS.", err)
142 return
143 }
144 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
145 Collection: tangled.RepoNSID,
146 Repo: newRepo.Did,
147 Rkey: newRepo.Rkey,
148 SwapRecord: ex.Cid,
149 Record: &lexutil.LexiconTypeDecoder{
150 Val: &record,
151 },
152 })
153
154 if err != nil {
155 fail("Failed to update spindle, unable to save to PDS.", err)
156 return
157 }
158
159 if !removingSpindle {
160 // add this spindle to spindle stream
161 rp.spindlestream.AddSource(
162 context.Background(),
163 eventconsumer.NewSpindleSource(newSpindle),
164 )
165 }
166
167 rp.pages.HxRefresh(w)
168}
169
170func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) {
171 user := rp.oauth.GetUser(r)
172 l := rp.logger.With("handler", "AddLabel")
173 l = l.With("did", user.Did)
174
175 f, err := rp.repoResolver.Resolve(r)
176 if err != nil {
177 l.Error("failed to get repo and knot", "err", err)
178 return
179 }
180
181 errorId := "add-label-error"
182 fail := func(msg string, err error) {
183 l.Error(msg, "err", err)
184 rp.pages.Notice(w, errorId, msg)
185 }
186
187 // get form values for label definition
188 name := r.FormValue("name")
189 concreteType := r.FormValue("valueType")
190 valueFormat := r.FormValue("valueFormat")
191 enumValues := r.FormValue("enumValues")
192 scope := r.Form["scope"]
193 color := r.FormValue("color")
194 multiple := r.FormValue("multiple") == "true"
195
196 var variants []string
197 for part := range strings.SplitSeq(enumValues, ",") {
198 if part = strings.TrimSpace(part); part != "" {
199 variants = append(variants, part)
200 }
201 }
202
203 if concreteType == "" {
204 concreteType = "null"
205 }
206
207 format := models.ValueTypeFormatAny
208 if valueFormat == "did" {
209 format = models.ValueTypeFormatDid
210 }
211
212 valueType := models.ValueType{
213 Type: models.ConcreteType(concreteType),
214 Format: format,
215 Enum: variants,
216 }
217
218 label := models.LabelDefinition{
219 Did: user.Did,
220 Rkey: tid.TID(),
221 Name: name,
222 ValueType: valueType,
223 Scope: scope,
224 Color: &color,
225 Multiple: multiple,
226 Created: time.Now(),
227 }
228 if err := rp.validator.ValidateLabelDefinition(&label); err != nil {
229 fail(err.Error(), err)
230 return
231 }
232
233 // announce this relation into the firehose, store into owners' pds
234 client, err := rp.oauth.AuthorizedClient(r)
235 if err != nil {
236 fail(err.Error(), err)
237 return
238 }
239
240 // emit a labelRecord
241 labelRecord := label.AsRecord()
242 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
243 Collection: tangled.LabelDefinitionNSID,
244 Repo: label.Did,
245 Rkey: label.Rkey,
246 Record: &lexutil.LexiconTypeDecoder{
247 Val: &labelRecord,
248 },
249 })
250 // invalid record
251 if err != nil {
252 fail("Failed to write record to PDS.", err)
253 return
254 }
255
256 aturi := resp.Uri
257 l = l.With("at-uri", aturi)
258 l.Info("wrote label record to PDS")
259
260 // update the repo to subscribe to this label
261 newRepo := *f
262 newRepo.Labels = append(newRepo.Labels, aturi)
263 repoRecord := newRepo.AsRecord()
264
265 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
266 if err != nil {
267 fail("Failed to update labels, no record found on PDS.", err)
268 return
269 }
270 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
271 Collection: tangled.RepoNSID,
272 Repo: newRepo.Did,
273 Rkey: newRepo.Rkey,
274 SwapRecord: ex.Cid,
275 Record: &lexutil.LexiconTypeDecoder{
276 Val: &repoRecord,
277 },
278 })
279 if err != nil {
280 fail("Failed to update labels for repo.", err)
281 return
282 }
283
284 tx, err := rp.db.BeginTx(r.Context(), nil)
285 if err != nil {
286 fail("Failed to add label.", err)
287 return
288 }
289
290 rollback := func() {
291 err1 := tx.Rollback()
292 err2 := rollbackRecord(context.Background(), aturi, client)
293
294 // ignore txn complete errors, this is okay
295 if errors.Is(err1, sql.ErrTxDone) {
296 err1 = nil
297 }
298
299 if errs := errors.Join(err1, err2); errs != nil {
300 l.Error("failed to rollback changes", "errs", errs)
301 return
302 }
303 }
304 defer rollback()
305
306 _, err = db.AddLabelDefinition(tx, &label)
307 if err != nil {
308 fail("Failed to add label.", err)
309 return
310 }
311
312 err = db.SubscribeLabel(tx, &models.RepoLabel{
313 RepoAt: f.RepoAt(),
314 LabelAt: label.AtUri(),
315 })
316
317 err = tx.Commit()
318 if err != nil {
319 fail("Failed to add label.", err)
320 return
321 }
322
323 // clear aturi when everything is successful
324 aturi = ""
325
326 rp.pages.HxRefresh(w)
327}
328
329func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) {
330 user := rp.oauth.GetUser(r)
331 l := rp.logger.With("handler", "DeleteLabel")
332 l = l.With("did", user.Did)
333
334 f, err := rp.repoResolver.Resolve(r)
335 if err != nil {
336 l.Error("failed to get repo and knot", "err", err)
337 return
338 }
339
340 errorId := "label-operation"
341 fail := func(msg string, err error) {
342 l.Error(msg, "err", err)
343 rp.pages.Notice(w, errorId, msg)
344 }
345
346 // get form values
347 labelId := r.FormValue("label-id")
348
349 label, err := db.GetLabelDefinition(rp.db, orm.FilterEq("id", labelId))
350 if err != nil {
351 fail("Failed to find label definition.", err)
352 return
353 }
354
355 client, err := rp.oauth.AuthorizedClient(r)
356 if err != nil {
357 fail(err.Error(), err)
358 return
359 }
360
361 // delete label record from PDS
362 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{
363 Collection: tangled.LabelDefinitionNSID,
364 Repo: label.Did,
365 Rkey: label.Rkey,
366 })
367 if err != nil {
368 fail("Failed to delete label record from PDS.", err)
369 return
370 }
371
372 // update repo record to remove the label reference
373 newRepo := *f
374 var updated []string
375 removedAt := label.AtUri().String()
376 for _, l := range newRepo.Labels {
377 if l != removedAt {
378 updated = append(updated, l)
379 }
380 }
381 newRepo.Labels = updated
382 repoRecord := newRepo.AsRecord()
383
384 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
385 if err != nil {
386 fail("Failed to update labels, no record found on PDS.", err)
387 return
388 }
389 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
390 Collection: tangled.RepoNSID,
391 Repo: newRepo.Did,
392 Rkey: newRepo.Rkey,
393 SwapRecord: ex.Cid,
394 Record: &lexutil.LexiconTypeDecoder{
395 Val: &repoRecord,
396 },
397 })
398 if err != nil {
399 fail("Failed to update repo record.", err)
400 return
401 }
402
403 // transaction for DB changes
404 tx, err := rp.db.BeginTx(r.Context(), nil)
405 if err != nil {
406 fail("Failed to delete label.", err)
407 return
408 }
409 defer tx.Rollback()
410
411 err = db.UnsubscribeLabel(
412 tx,
413 orm.FilterEq("repo_at", f.RepoAt()),
414 orm.FilterEq("label_at", removedAt),
415 )
416 if err != nil {
417 fail("Failed to unsubscribe label.", err)
418 return
419 }
420
421 err = db.DeleteLabelDefinition(tx, orm.FilterEq("id", label.Id))
422 if err != nil {
423 fail("Failed to delete label definition.", err)
424 return
425 }
426
427 err = tx.Commit()
428 if err != nil {
429 fail("Failed to delete label.", err)
430 return
431 }
432
433 // everything succeeded
434 rp.pages.HxRefresh(w)
435}
436
437func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) {
438 user := rp.oauth.GetUser(r)
439 l := rp.logger.With("handler", "SubscribeLabel")
440 l = l.With("did", user.Did)
441
442 f, err := rp.repoResolver.Resolve(r)
443 if err != nil {
444 l.Error("failed to get repo and knot", "err", err)
445 return
446 }
447
448 if err := r.ParseForm(); err != nil {
449 l.Error("invalid form", "err", err)
450 return
451 }
452
453 errorId := "default-label-operation"
454 fail := func(msg string, err error) {
455 l.Error(msg, "err", err)
456 rp.pages.Notice(w, errorId, msg)
457 }
458
459 labelAts := r.Form["label"]
460 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts))
461 if err != nil {
462 fail("Failed to subscribe to label.", err)
463 return
464 }
465
466 newRepo := *f
467 newRepo.Labels = append(newRepo.Labels, labelAts...)
468
469 // dedup
470 slices.Sort(newRepo.Labels)
471 newRepo.Labels = slices.Compact(newRepo.Labels)
472
473 repoRecord := newRepo.AsRecord()
474
475 client, err := rp.oauth.AuthorizedClient(r)
476 if err != nil {
477 fail(err.Error(), err)
478 return
479 }
480
481 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey)
482 if err != nil {
483 fail("Failed to update labels, no record found on PDS.", err)
484 return
485 }
486 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
487 Collection: tangled.RepoNSID,
488 Repo: newRepo.Did,
489 Rkey: newRepo.Rkey,
490 SwapRecord: ex.Cid,
491 Record: &lexutil.LexiconTypeDecoder{
492 Val: &repoRecord,
493 },
494 })
495
496 tx, err := rp.db.Begin()
497 if err != nil {
498 fail("Failed to subscribe to label.", err)
499 return
500 }
501 defer tx.Rollback()
502
503 for _, l := range labelAts {
504 err = db.SubscribeLabel(tx, &models.RepoLabel{
505 RepoAt: f.RepoAt(),
506 LabelAt: syntax.ATURI(l),
507 })
508 if err != nil {
509 fail("Failed to subscribe to label.", err)
510 return
511 }
512 }
513
514 if err := tx.Commit(); err != nil {
515 fail("Failed to subscribe to label.", err)
516 return
517 }
518
519 // everything succeeded
520 rp.pages.HxRefresh(w)
521}
522
523func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) {
524 user := rp.oauth.GetUser(r)
525 l := rp.logger.With("handler", "UnsubscribeLabel")
526 l = l.With("did", user.Did)
527
528 f, err := rp.repoResolver.Resolve(r)
529 if err != nil {
530 l.Error("failed to get repo and knot", "err", err)
531 return
532 }
533
534 if err := r.ParseForm(); err != nil {
535 l.Error("invalid form", "err", err)
536 return
537 }
538
539 errorId := "default-label-operation"
540 fail := func(msg string, err error) {
541 l.Error(msg, "err", err)
542 rp.pages.Notice(w, errorId, msg)
543 }
544
545 labelAts := r.Form["label"]
546 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts))
547 if err != nil {
548 fail("Failed to unsubscribe to label.", err)
549 return
550 }
551
552 // update repo record to remove the label reference
553 newRepo := *f
554 var updated []string
555 for _, l := range newRepo.Labels {
556 if !slices.Contains(labelAts, l) {
557 updated = append(updated, l)
558 }
559 }
560 newRepo.Labels = updated
561 repoRecord := newRepo.AsRecord()
562
563 client, err := rp.oauth.AuthorizedClient(r)
564 if err != nil {
565 fail(err.Error(), err)
566 return
567 }
568
569 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey)
570 if err != nil {
571 fail("Failed to update labels, no record found on PDS.", err)
572 return
573 }
574 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
575 Collection: tangled.RepoNSID,
576 Repo: newRepo.Did,
577 Rkey: newRepo.Rkey,
578 SwapRecord: ex.Cid,
579 Record: &lexutil.LexiconTypeDecoder{
580 Val: &repoRecord,
581 },
582 })
583
584 err = db.UnsubscribeLabel(
585 rp.db,
586 orm.FilterEq("repo_at", f.RepoAt()),
587 orm.FilterIn("label_at", labelAts),
588 )
589 if err != nil {
590 fail("Failed to unsubscribe label.", err)
591 return
592 }
593
594 // everything succeeded
595 rp.pages.HxRefresh(w)
596}
597
598func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) {
599 l := rp.logger.With("handler", "LabelPanel")
600
601 f, err := rp.repoResolver.Resolve(r)
602 if err != nil {
603 l.Error("failed to get repo and knot", "err", err)
604 return
605 }
606
607 subjectStr := r.FormValue("subject")
608 subject, err := syntax.ParseATURI(subjectStr)
609 if err != nil {
610 l.Error("failed to get repo and knot", "err", err)
611 return
612 }
613
614 labelDefs, err := db.GetLabelDefinitions(
615 rp.db,
616 orm.FilterIn("at_uri", f.Labels),
617 orm.FilterContains("scope", subject.Collection().String()),
618 )
619 if err != nil {
620 l.Error("failed to fetch label defs", "err", err)
621 return
622 }
623
624 defs := make(map[string]*models.LabelDefinition)
625 for _, l := range labelDefs {
626 defs[l.AtUri().String()] = &l
627 }
628
629 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject))
630 if err != nil {
631 l.Error("failed to build label state", "err", err)
632 return
633 }
634 state := states[subject]
635
636 user := rp.oauth.GetUser(r)
637 rp.pages.LabelPanel(w, pages.LabelPanelParams{
638 LoggedInUser: user,
639 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
640 Defs: defs,
641 Subject: subject.String(),
642 State: state,
643 })
644}
645
646func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) {
647 l := rp.logger.With("handler", "EditLabelPanel")
648
649 f, err := rp.repoResolver.Resolve(r)
650 if err != nil {
651 l.Error("failed to get repo and knot", "err", err)
652 return
653 }
654
655 subjectStr := r.FormValue("subject")
656 subject, err := syntax.ParseATURI(subjectStr)
657 if err != nil {
658 l.Error("failed to get repo and knot", "err", err)
659 return
660 }
661
662 labelDefs, err := db.GetLabelDefinitions(
663 rp.db,
664 orm.FilterIn("at_uri", f.Labels),
665 orm.FilterContains("scope", subject.Collection().String()),
666 )
667 if err != nil {
668 l.Error("failed to fetch labels", "err", err)
669 return
670 }
671
672 defs := make(map[string]*models.LabelDefinition)
673 for _, l := range labelDefs {
674 defs[l.AtUri().String()] = &l
675 }
676
677 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject))
678 if err != nil {
679 l.Error("failed to build label state", "err", err)
680 return
681 }
682 state := states[subject]
683
684 user := rp.oauth.GetUser(r)
685 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{
686 LoggedInUser: user,
687 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
688 Defs: defs,
689 Subject: subject.String(),
690 State: state,
691 })
692}
693
694func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) {
695 user := rp.oauth.GetUser(r)
696 l := rp.logger.With("handler", "AddCollaborator")
697 l = l.With("did", user.Did)
698
699 f, err := rp.repoResolver.Resolve(r)
700 if err != nil {
701 l.Error("failed to get repo and knot", "err", err)
702 return
703 }
704
705 errorId := "add-collaborator-error"
706 fail := func(msg string, err error) {
707 l.Error(msg, "err", err)
708 rp.pages.Notice(w, errorId, msg)
709 }
710
711 collaborator := r.FormValue("collaborator")
712 if collaborator == "" {
713 fail("Invalid form.", nil)
714 return
715 }
716
717 // remove a single leading `@`, to make @handle work with ResolveIdent
718 collaborator = strings.TrimPrefix(collaborator, "@")
719
720 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator)
721 if err != nil {
722 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err)
723 return
724 }
725
726 if collaboratorIdent.DID.String() == user.Did {
727 fail("You seem to be adding yourself as a collaborator.", nil)
728 return
729 }
730 l = l.With("collaborator", collaboratorIdent.Handle)
731 l = l.With("knot", f.Knot)
732
733 // announce this relation into the firehose, store into owners' pds
734 client, err := rp.oauth.AuthorizedClient(r)
735 if err != nil {
736 fail("Failed to write to PDS.", err)
737 return
738 }
739
740 // emit a record
741 currentUser := rp.oauth.GetUser(r)
742 rkey := tid.TID()
743 createdAt := time.Now()
744 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
745 Collection: tangled.RepoCollaboratorNSID,
746 Repo: currentUser.Did,
747 Rkey: rkey,
748 Record: &lexutil.LexiconTypeDecoder{
749 Val: &tangled.RepoCollaborator{
750 Subject: collaboratorIdent.DID.String(),
751 Repo: string(f.RepoAt()),
752 CreatedAt: createdAt.Format(time.RFC3339),
753 }},
754 })
755 // invalid record
756 if err != nil {
757 fail("Failed to write record to PDS.", err)
758 return
759 }
760
761 aturi := resp.Uri
762 l = l.With("at-uri", aturi)
763 l.Info("wrote record to PDS")
764
765 tx, err := rp.db.BeginTx(r.Context(), nil)
766 if err != nil {
767 fail("Failed to add collaborator.", err)
768 return
769 }
770
771 rollback := func() {
772 err1 := tx.Rollback()
773 err2 := rp.enforcer.E.LoadPolicy()
774 err3 := rollbackRecord(context.Background(), aturi, client)
775
776 // ignore txn complete errors, this is okay
777 if errors.Is(err1, sql.ErrTxDone) {
778 err1 = nil
779 }
780
781 if errs := errors.Join(err1, err2, err3); errs != nil {
782 l.Error("failed to rollback changes", "errs", errs)
783 return
784 }
785 }
786 defer rollback()
787
788 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.DidSlashRepo())
789 if err != nil {
790 fail("Failed to add collaborator permissions.", err)
791 return
792 }
793
794 err = db.AddCollaborator(tx, models.Collaborator{
795 Did: syntax.DID(currentUser.Did),
796 Rkey: rkey,
797 SubjectDid: collaboratorIdent.DID,
798 RepoAt: f.RepoAt(),
799 Created: createdAt,
800 })
801 if err != nil {
802 fail("Failed to add collaborator.", err)
803 return
804 }
805
806 err = tx.Commit()
807 if err != nil {
808 fail("Failed to add collaborator.", err)
809 return
810 }
811
812 err = rp.enforcer.E.SavePolicy()
813 if err != nil {
814 fail("Failed to update collaborator permissions.", err)
815 return
816 }
817
818 // clear aturi to when everything is successful
819 aturi = ""
820
821 rp.pages.HxRefresh(w)
822}
823
824func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) {
825 user := rp.oauth.GetUser(r)
826 l := rp.logger.With("handler", "DeleteRepo")
827
828 noticeId := "operation-error"
829 f, err := rp.repoResolver.Resolve(r)
830 if err != nil {
831 l.Error("failed to get repo and knot", "err", err)
832 return
833 }
834
835 // remove record from pds
836 atpClient, err := rp.oauth.AuthorizedClient(r)
837 if err != nil {
838 l.Error("failed to get authorized client", "err", err)
839 return
840 }
841 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{
842 Collection: tangled.RepoNSID,
843 Repo: user.Did,
844 Rkey: f.Rkey,
845 })
846 if err != nil {
847 l.Error("failed to delete record", "err", err)
848 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.")
849 return
850 }
851 l.Info("removed repo record", "aturi", f.RepoAt().String())
852
853 client, err := rp.oauth.ServiceClient(
854 r,
855 oauth.WithService(f.Knot),
856 oauth.WithLxm(tangled.RepoDeleteNSID),
857 oauth.WithDev(rp.config.Core.Dev),
858 )
859 if err != nil {
860 l.Error("failed to connect to knot server", "err", err)
861 return
862 }
863
864 err = tangled.RepoDelete(
865 r.Context(),
866 client,
867 &tangled.RepoDelete_Input{
868 Did: f.Did,
869 Name: f.Name,
870 Rkey: f.Rkey,
871 },
872 )
873 if err := xrpcclient.HandleXrpcErr(err); err != nil {
874 rp.pages.Notice(w, noticeId, err.Error())
875 return
876 }
877 l.Info("deleted repo from knot")
878
879 tx, err := rp.db.BeginTx(r.Context(), nil)
880 if err != nil {
881 l.Error("failed to start tx")
882 w.Write(fmt.Append(nil, "failed to add collaborator: ", err))
883 return
884 }
885 defer func() {
886 tx.Rollback()
887 err = rp.enforcer.E.LoadPolicy()
888 if err != nil {
889 l.Error("failed to rollback policies")
890 }
891 }()
892
893 // remove collaborator RBAC
894 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.DidSlashRepo(), f.Knot)
895 if err != nil {
896 rp.pages.Notice(w, noticeId, "Failed to remove collaborators")
897 return
898 }
899 for _, c := range repoCollaborators {
900 did := c[0]
901 rp.enforcer.RemoveCollaborator(did, f.Knot, f.DidSlashRepo())
902 }
903 l.Info("removed collaborators")
904
905 // remove repo RBAC
906 err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.DidSlashRepo())
907 if err != nil {
908 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules")
909 return
910 }
911
912 // remove repo from db
913 err = db.RemoveRepo(tx, f.Did, f.Name)
914 if err != nil {
915 rp.pages.Notice(w, noticeId, "Failed to update appview")
916 return
917 }
918 l.Info("removed repo from db")
919
920 err = tx.Commit()
921 if err != nil {
922 l.Error("failed to commit changes", "err", err)
923 http.Error(w, err.Error(), http.StatusInternalServerError)
924 return
925 }
926
927 err = rp.enforcer.E.SavePolicy()
928 if err != nil {
929 l.Error("failed to update ACLs", "err", err)
930 http.Error(w, err.Error(), http.StatusInternalServerError)
931 return
932 }
933
934 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.Did))
935}
936
937func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) {
938 l := rp.logger.With("handler", "SyncRepoFork")
939
940 ref := chi.URLParam(r, "ref")
941 ref, _ = url.PathUnescape(ref)
942
943 user := rp.oauth.GetUser(r)
944 f, err := rp.repoResolver.Resolve(r)
945 if err != nil {
946 l.Error("failed to resolve source repo", "err", err)
947 return
948 }
949
950 switch r.Method {
951 case http.MethodPost:
952 client, err := rp.oauth.ServiceClient(
953 r,
954 oauth.WithService(f.Knot),
955 oauth.WithLxm(tangled.RepoForkSyncNSID),
956 oauth.WithDev(rp.config.Core.Dev),
957 )
958 if err != nil {
959 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
960 return
961 }
962
963 if f.Source == "" {
964 rp.pages.Notice(w, "repo", "This repository is not a fork.")
965 return
966 }
967
968 err = tangled.RepoForkSync(
969 r.Context(),
970 client,
971 &tangled.RepoForkSync_Input{
972 Did: user.Did,
973 Name: f.Name,
974 Source: f.Source,
975 Branch: ref,
976 },
977 )
978 if err := xrpcclient.HandleXrpcErr(err); err != nil {
979 rp.pages.Notice(w, "repo", err.Error())
980 return
981 }
982
983 rp.pages.HxRefresh(w)
984 return
985 }
986}
987
988func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) {
989 l := rp.logger.With("handler", "ForkRepo")
990
991 user := rp.oauth.GetUser(r)
992 f, err := rp.repoResolver.Resolve(r)
993 if err != nil {
994 l.Error("failed to resolve source repo", "err", err)
995 return
996 }
997
998 switch r.Method {
999 case http.MethodGet:
1000 user := rp.oauth.GetUser(r)
1001 knots, err := rp.enforcer.GetKnotsForUser(user.Did)
1002 if err != nil {
1003 rp.pages.Notice(w, "repo", "Invalid user account.")
1004 return
1005 }
1006
1007 rp.pages.ForkRepo(w, pages.ForkRepoParams{
1008 LoggedInUser: user,
1009 Knots: knots,
1010 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
1011 })
1012
1013 case http.MethodPost:
1014 l := rp.logger.With("handler", "ForkRepo")
1015
1016 targetKnot := r.FormValue("knot")
1017 if targetKnot == "" {
1018 rp.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
1019 return
1020 }
1021 l = l.With("targetKnot", targetKnot)
1022
1023 ok, err := rp.enforcer.E.Enforce(user.Did, targetKnot, targetKnot, "repo:create")
1024 if err != nil || !ok {
1025 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
1026 return
1027 }
1028
1029 // choose a name for a fork
1030 forkName := r.FormValue("repo_name")
1031 if forkName == "" {
1032 rp.pages.Notice(w, "repo", "Repository name cannot be empty.")
1033 return
1034 }
1035
1036 // this check is *only* to see if the forked repo name already exists
1037 // in the user's account.
1038 existingRepo, err := db.GetRepo(
1039 rp.db,
1040 orm.FilterEq("did", user.Did),
1041 orm.FilterEq("name", forkName),
1042 )
1043 if err != nil {
1044 if !errors.Is(err, sql.ErrNoRows) {
1045 l.Error("error fetching existing repo from db", "err", err)
1046 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.")
1047 return
1048 }
1049 } else if existingRepo != nil {
1050 // repo with this name already exists
1051 rp.pages.Notice(w, "repo", "A repository with this name already exists.")
1052 return
1053 }
1054 l = l.With("forkName", forkName)
1055
1056 uri := "https"
1057 if rp.config.Core.Dev {
1058 uri = "http"
1059 }
1060
1061 forkSourceUrl := fmt.Sprintf("%s://%s/%s/%s", uri, f.Knot, f.Did, f.Name)
1062 l = l.With("cloneUrl", forkSourceUrl)
1063
1064 sourceAt := f.RepoAt().String()
1065
1066 // create an atproto record for this fork
1067 rkey := tid.TID()
1068 repo := &models.Repo{
1069 Did: user.Did,
1070 Name: forkName,
1071 Knot: targetKnot,
1072 Rkey: rkey,
1073 Source: sourceAt,
1074 Description: f.Description,
1075 Created: time.Now(),
1076 Labels: rp.config.Label.DefaultLabelDefs,
1077 }
1078 record := repo.AsRecord()
1079
1080 atpClient, err := rp.oauth.AuthorizedClient(r)
1081 if err != nil {
1082 l.Error("failed to create xrpcclient", "err", err)
1083 rp.pages.Notice(w, "repo", "Failed to fork repository.")
1084 return
1085 }
1086
1087 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
1088 Collection: tangled.RepoNSID,
1089 Repo: user.Did,
1090 Rkey: rkey,
1091 Record: &lexutil.LexiconTypeDecoder{
1092 Val: &record,
1093 },
1094 })
1095 if err != nil {
1096 l.Error("failed to write to PDS", "err", err)
1097 rp.pages.Notice(w, "repo", "Failed to announce repository creation.")
1098 return
1099 }
1100
1101 aturi := atresp.Uri
1102 l = l.With("aturi", aturi)
1103 l.Info("wrote to PDS")
1104
1105 tx, err := rp.db.BeginTx(r.Context(), nil)
1106 if err != nil {
1107 l.Info("txn failed", "err", err)
1108 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1109 return
1110 }
1111
1112 // The rollback function reverts a few things on failure:
1113 // - the pending txn
1114 // - the ACLs
1115 // - the atproto record created
1116 rollback := func() {
1117 err1 := tx.Rollback()
1118 err2 := rp.enforcer.E.LoadPolicy()
1119 err3 := rollbackRecord(context.Background(), aturi, atpClient)
1120
1121 // ignore txn complete errors, this is okay
1122 if errors.Is(err1, sql.ErrTxDone) {
1123 err1 = nil
1124 }
1125
1126 if errs := errors.Join(err1, err2, err3); errs != nil {
1127 l.Error("failed to rollback changes", "errs", errs)
1128 return
1129 }
1130 }
1131 defer rollback()
1132
1133 // TODO: this could coordinate better with the knot to recieve a clone status
1134 client, err := rp.oauth.ServiceClient(
1135 r,
1136 oauth.WithService(targetKnot),
1137 oauth.WithLxm(tangled.RepoCreateNSID),
1138 oauth.WithDev(rp.config.Core.Dev),
1139 oauth.WithTimeout(time.Second*20), // big repos take time to clone
1140 )
1141 if err != nil {
1142 l.Error("could not create service client", "err", err)
1143 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
1144 return
1145 }
1146
1147 err = tangled.RepoCreate(
1148 r.Context(),
1149 client,
1150 &tangled.RepoCreate_Input{
1151 Rkey: rkey,
1152 Source: &forkSourceUrl,
1153 },
1154 )
1155 if err := xrpcclient.HandleXrpcErr(err); err != nil {
1156 rp.pages.Notice(w, "repo", err.Error())
1157 return
1158 }
1159
1160 err = db.AddRepo(tx, repo)
1161 if err != nil {
1162 l.Error("failed to AddRepo", "err", err)
1163 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1164 return
1165 }
1166
1167 // acls
1168 p, _ := securejoin.SecureJoin(user.Did, forkName)
1169 err = rp.enforcer.AddRepo(user.Did, targetKnot, p)
1170 if err != nil {
1171 l.Error("failed to add ACLs", "err", err)
1172 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.")
1173 return
1174 }
1175
1176 err = tx.Commit()
1177 if err != nil {
1178 l.Error("failed to commit changes", "err", err)
1179 http.Error(w, err.Error(), http.StatusInternalServerError)
1180 return
1181 }
1182
1183 err = rp.enforcer.E.SavePolicy()
1184 if err != nil {
1185 l.Error("failed to update ACLs", "err", err)
1186 http.Error(w, err.Error(), http.StatusInternalServerError)
1187 return
1188 }
1189
1190 // reset the ATURI because the transaction completed successfully
1191 aturi = ""
1192
1193 rp.notifier.NewRepo(r.Context(), repo)
1194 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Did, forkName))
1195 }
1196}
1197
1198// this is used to rollback changes made to the PDS
1199//
1200// it is a no-op if the provided ATURI is empty
1201func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
1202 if aturi == "" {
1203 return nil
1204 }
1205
1206 parsed := syntax.ATURI(aturi)
1207
1208 collection := parsed.Collection().String()
1209 repo := parsed.Authority().String()
1210 rkey := parsed.RecordKey().String()
1211
1212 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
1213 Collection: collection,
1214 Repo: repo,
1215 Rkey: rkey,
1216 })
1217 return err
1218}