1package repo
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "net/url"
11 "slices"
12 "strings"
13 "time"
14
15 "tangled.org/core/api/tangled"
16 "tangled.org/core/appview/config"
17 "tangled.org/core/appview/db"
18 "tangled.org/core/appview/models"
19 "tangled.org/core/appview/notify"
20 "tangled.org/core/appview/oauth"
21 "tangled.org/core/appview/pages"
22 "tangled.org/core/appview/reporesolver"
23 "tangled.org/core/appview/validator"
24 xrpcclient "tangled.org/core/appview/xrpcclient"
25 "tangled.org/core/eventconsumer"
26 "tangled.org/core/idresolver"
27 "tangled.org/core/rbac"
28 "tangled.org/core/tid"
29 "tangled.org/core/xrpc/serviceauth"
30
31 comatproto "github.com/bluesky-social/indigo/api/atproto"
32 atpclient "github.com/bluesky-social/indigo/atproto/client"
33 "github.com/bluesky-social/indigo/atproto/syntax"
34 lexutil "github.com/bluesky-social/indigo/lex/util"
35 securejoin "github.com/cyphar/filepath-securejoin"
36 "github.com/go-chi/chi/v5"
37)
38
39type Repo struct {
40 repoResolver *reporesolver.RepoResolver
41 idResolver *idresolver.Resolver
42 config *config.Config
43 oauth *oauth.OAuth
44 pages *pages.Pages
45 spindlestream *eventconsumer.Consumer
46 db *db.DB
47 enforcer *rbac.Enforcer
48 notifier notify.Notifier
49 logger *slog.Logger
50 serviceAuth *serviceauth.ServiceAuth
51 validator *validator.Validator
52}
53
54func New(
55 oauth *oauth.OAuth,
56 repoResolver *reporesolver.RepoResolver,
57 pages *pages.Pages,
58 spindlestream *eventconsumer.Consumer,
59 idResolver *idresolver.Resolver,
60 db *db.DB,
61 config *config.Config,
62 notifier notify.Notifier,
63 enforcer *rbac.Enforcer,
64 logger *slog.Logger,
65 validator *validator.Validator,
66) *Repo {
67 return &Repo{oauth: oauth,
68 repoResolver: repoResolver,
69 pages: pages,
70 idResolver: idResolver,
71 config: config,
72 spindlestream: spindlestream,
73 db: db,
74 notifier: notifier,
75 enforcer: enforcer,
76 logger: logger,
77 validator: validator,
78 }
79}
80
81// isTextualMimeType returns true if the MIME type represents textual content
82
83// modify the spindle configured for this repo
84func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) {
85 user := rp.oauth.GetUser(r)
86 l := rp.logger.With("handler", "EditSpindle")
87 l = l.With("did", user.Did)
88
89 errorId := "operation-error"
90 fail := func(msg string, err error) {
91 l.Error(msg, "err", err)
92 rp.pages.Notice(w, errorId, msg)
93 }
94
95 f, err := rp.repoResolver.Resolve(r)
96 if err != nil {
97 fail("Failed to resolve repo. Try again later", err)
98 return
99 }
100
101 newSpindle := r.FormValue("spindle")
102 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value
103 client, err := rp.oauth.AuthorizedClient(r)
104 if err != nil {
105 fail("Failed to authorize. Try again later.", err)
106 return
107 }
108
109 if !removingSpindle {
110 // ensure that this is a valid spindle for this user
111 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Did)
112 if err != nil {
113 fail("Failed to find spindles. Try again later.", err)
114 return
115 }
116
117 if !slices.Contains(validSpindles, newSpindle) {
118 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles))
119 return
120 }
121 }
122
123 newRepo := f.Repo
124 newRepo.Spindle = newSpindle
125 record := newRepo.AsRecord()
126
127 spindlePtr := &newSpindle
128 if removingSpindle {
129 spindlePtr = nil
130 newRepo.Spindle = ""
131 }
132
133 // optimistic update
134 err = db.UpdateSpindle(rp.db, newRepo.RepoAt().String(), spindlePtr)
135 if err != nil {
136 fail("Failed to update spindle. Try again later.", err)
137 return
138 }
139
140 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
141 if err != nil {
142 fail("Failed to update spindle, no record found on PDS.", err)
143 return
144 }
145 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
146 Collection: tangled.RepoNSID,
147 Repo: newRepo.Did,
148 Rkey: newRepo.Rkey,
149 SwapRecord: ex.Cid,
150 Record: &lexutil.LexiconTypeDecoder{
151 Val: &record,
152 },
153 })
154
155 if err != nil {
156 fail("Failed to update spindle, unable to save to PDS.", err)
157 return
158 }
159
160 if !removingSpindle {
161 // add this spindle to spindle stream
162 rp.spindlestream.AddSource(
163 context.Background(),
164 eventconsumer.NewSpindleSource(newSpindle),
165 )
166 }
167
168 rp.pages.HxRefresh(w)
169}
170
171func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) {
172 user := rp.oauth.GetUser(r)
173 l := rp.logger.With("handler", "AddLabel")
174 l = l.With("did", user.Did)
175
176 f, err := rp.repoResolver.Resolve(r)
177 if err != nil {
178 l.Error("failed to get repo and knot", "err", err)
179 return
180 }
181
182 errorId := "add-label-error"
183 fail := func(msg string, err error) {
184 l.Error(msg, "err", err)
185 rp.pages.Notice(w, errorId, msg)
186 }
187
188 // get form values for label definition
189 name := r.FormValue("name")
190 concreteType := r.FormValue("valueType")
191 valueFormat := r.FormValue("valueFormat")
192 enumValues := r.FormValue("enumValues")
193 scope := r.Form["scope"]
194 color := r.FormValue("color")
195 multiple := r.FormValue("multiple") == "true"
196
197 var variants []string
198 for part := range strings.SplitSeq(enumValues, ",") {
199 if part = strings.TrimSpace(part); part != "" {
200 variants = append(variants, part)
201 }
202 }
203
204 if concreteType == "" {
205 concreteType = "null"
206 }
207
208 format := models.ValueTypeFormatAny
209 if valueFormat == "did" {
210 format = models.ValueTypeFormatDid
211 }
212
213 valueType := models.ValueType{
214 Type: models.ConcreteType(concreteType),
215 Format: format,
216 Enum: variants,
217 }
218
219 label := models.LabelDefinition{
220 Did: user.Did,
221 Rkey: tid.TID(),
222 Name: name,
223 ValueType: valueType,
224 Scope: scope,
225 Color: &color,
226 Multiple: multiple,
227 Created: time.Now(),
228 }
229 if err := rp.validator.ValidateLabelDefinition(&label); err != nil {
230 fail(err.Error(), err)
231 return
232 }
233
234 // announce this relation into the firehose, store into owners' pds
235 client, err := rp.oauth.AuthorizedClient(r)
236 if err != nil {
237 fail(err.Error(), err)
238 return
239 }
240
241 // emit a labelRecord
242 labelRecord := label.AsRecord()
243 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
244 Collection: tangled.LabelDefinitionNSID,
245 Repo: label.Did,
246 Rkey: label.Rkey,
247 Record: &lexutil.LexiconTypeDecoder{
248 Val: &labelRecord,
249 },
250 })
251 // invalid record
252 if err != nil {
253 fail("Failed to write record to PDS.", err)
254 return
255 }
256
257 aturi := resp.Uri
258 l = l.With("at-uri", aturi)
259 l.Info("wrote label record to PDS")
260
261 // update the repo to subscribe to this label
262 newRepo := f.Repo
263 newRepo.Labels = append(newRepo.Labels, aturi)
264 repoRecord := newRepo.AsRecord()
265
266 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
267 if err != nil {
268 fail("Failed to update labels, no record found on PDS.", err)
269 return
270 }
271 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
272 Collection: tangled.RepoNSID,
273 Repo: newRepo.Did,
274 Rkey: newRepo.Rkey,
275 SwapRecord: ex.Cid,
276 Record: &lexutil.LexiconTypeDecoder{
277 Val: &repoRecord,
278 },
279 })
280 if err != nil {
281 fail("Failed to update labels for repo.", err)
282 return
283 }
284
285 tx, err := rp.db.BeginTx(r.Context(), nil)
286 if err != nil {
287 fail("Failed to add label.", err)
288 return
289 }
290
291 rollback := func() {
292 err1 := tx.Rollback()
293 err2 := rollbackRecord(context.Background(), aturi, client)
294
295 // ignore txn complete errors, this is okay
296 if errors.Is(err1, sql.ErrTxDone) {
297 err1 = nil
298 }
299
300 if errs := errors.Join(err1, err2); errs != nil {
301 l.Error("failed to rollback changes", "errs", errs)
302 return
303 }
304 }
305 defer rollback()
306
307 _, err = db.AddLabelDefinition(tx, &label)
308 if err != nil {
309 fail("Failed to add label.", err)
310 return
311 }
312
313 err = db.SubscribeLabel(tx, &models.RepoLabel{
314 RepoAt: f.RepoAt(),
315 LabelAt: label.AtUri(),
316 })
317
318 err = tx.Commit()
319 if err != nil {
320 fail("Failed to add label.", err)
321 return
322 }
323
324 // clear aturi when everything is successful
325 aturi = ""
326
327 rp.pages.HxRefresh(w)
328}
329
330func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) {
331 user := rp.oauth.GetUser(r)
332 l := rp.logger.With("handler", "DeleteLabel")
333 l = l.With("did", user.Did)
334
335 f, err := rp.repoResolver.Resolve(r)
336 if err != nil {
337 l.Error("failed to get repo and knot", "err", err)
338 return
339 }
340
341 errorId := "label-operation"
342 fail := func(msg string, err error) {
343 l.Error(msg, "err", err)
344 rp.pages.Notice(w, errorId, msg)
345 }
346
347 // get form values
348 labelId := r.FormValue("label-id")
349
350 label, err := db.GetLabelDefinition(rp.db, db.FilterEq("id", labelId))
351 if err != nil {
352 fail("Failed to find label definition.", err)
353 return
354 }
355
356 client, err := rp.oauth.AuthorizedClient(r)
357 if err != nil {
358 fail(err.Error(), err)
359 return
360 }
361
362 // delete label record from PDS
363 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{
364 Collection: tangled.LabelDefinitionNSID,
365 Repo: label.Did,
366 Rkey: label.Rkey,
367 })
368 if err != nil {
369 fail("Failed to delete label record from PDS.", err)
370 return
371 }
372
373 // update repo record to remove the label reference
374 newRepo := f.Repo
375 var updated []string
376 removedAt := label.AtUri().String()
377 for _, l := range newRepo.Labels {
378 if l != removedAt {
379 updated = append(updated, l)
380 }
381 }
382 newRepo.Labels = updated
383 repoRecord := newRepo.AsRecord()
384
385 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
386 if err != nil {
387 fail("Failed to update labels, no record found on PDS.", err)
388 return
389 }
390 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
391 Collection: tangled.RepoNSID,
392 Repo: newRepo.Did,
393 Rkey: newRepo.Rkey,
394 SwapRecord: ex.Cid,
395 Record: &lexutil.LexiconTypeDecoder{
396 Val: &repoRecord,
397 },
398 })
399 if err != nil {
400 fail("Failed to update repo record.", err)
401 return
402 }
403
404 // transaction for DB changes
405 tx, err := rp.db.BeginTx(r.Context(), nil)
406 if err != nil {
407 fail("Failed to delete label.", err)
408 return
409 }
410 defer tx.Rollback()
411
412 err = db.UnsubscribeLabel(
413 tx,
414 db.FilterEq("repo_at", f.RepoAt()),
415 db.FilterEq("label_at", removedAt),
416 )
417 if err != nil {
418 fail("Failed to unsubscribe label.", err)
419 return
420 }
421
422 err = db.DeleteLabelDefinition(tx, db.FilterEq("id", label.Id))
423 if err != nil {
424 fail("Failed to delete label definition.", err)
425 return
426 }
427
428 err = tx.Commit()
429 if err != nil {
430 fail("Failed to delete label.", err)
431 return
432 }
433
434 // everything succeeded
435 rp.pages.HxRefresh(w)
436}
437
438func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) {
439 user := rp.oauth.GetUser(r)
440 l := rp.logger.With("handler", "SubscribeLabel")
441 l = l.With("did", user.Did)
442
443 f, err := rp.repoResolver.Resolve(r)
444 if err != nil {
445 l.Error("failed to get repo and knot", "err", err)
446 return
447 }
448
449 if err := r.ParseForm(); err != nil {
450 l.Error("invalid form", "err", err)
451 return
452 }
453
454 errorId := "default-label-operation"
455 fail := func(msg string, err error) {
456 l.Error(msg, "err", err)
457 rp.pages.Notice(w, errorId, msg)
458 }
459
460 labelAts := r.Form["label"]
461 _, err = db.GetLabelDefinitions(rp.db, db.FilterIn("at_uri", labelAts))
462 if err != nil {
463 fail("Failed to subscribe to label.", err)
464 return
465 }
466
467 newRepo := f.Repo
468 newRepo.Labels = append(newRepo.Labels, labelAts...)
469
470 // dedup
471 slices.Sort(newRepo.Labels)
472 newRepo.Labels = slices.Compact(newRepo.Labels)
473
474 repoRecord := newRepo.AsRecord()
475
476 client, err := rp.oauth.AuthorizedClient(r)
477 if err != nil {
478 fail(err.Error(), err)
479 return
480 }
481
482 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Repo.Did, f.Repo.Rkey)
483 if err != nil {
484 fail("Failed to update labels, no record found on PDS.", err)
485 return
486 }
487 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
488 Collection: tangled.RepoNSID,
489 Repo: newRepo.Did,
490 Rkey: newRepo.Rkey,
491 SwapRecord: ex.Cid,
492 Record: &lexutil.LexiconTypeDecoder{
493 Val: &repoRecord,
494 },
495 })
496
497 tx, err := rp.db.Begin()
498 if err != nil {
499 fail("Failed to subscribe to label.", err)
500 return
501 }
502 defer tx.Rollback()
503
504 for _, l := range labelAts {
505 err = db.SubscribeLabel(tx, &models.RepoLabel{
506 RepoAt: f.RepoAt(),
507 LabelAt: syntax.ATURI(l),
508 })
509 if err != nil {
510 fail("Failed to subscribe to label.", err)
511 return
512 }
513 }
514
515 if err := tx.Commit(); err != nil {
516 fail("Failed to subscribe to label.", err)
517 return
518 }
519
520 // everything succeeded
521 rp.pages.HxRefresh(w)
522}
523
524func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) {
525 user := rp.oauth.GetUser(r)
526 l := rp.logger.With("handler", "UnsubscribeLabel")
527 l = l.With("did", user.Did)
528
529 f, err := rp.repoResolver.Resolve(r)
530 if err != nil {
531 l.Error("failed to get repo and knot", "err", err)
532 return
533 }
534
535 if err := r.ParseForm(); err != nil {
536 l.Error("invalid form", "err", err)
537 return
538 }
539
540 errorId := "default-label-operation"
541 fail := func(msg string, err error) {
542 l.Error(msg, "err", err)
543 rp.pages.Notice(w, errorId, msg)
544 }
545
546 labelAts := r.Form["label"]
547 _, err = db.GetLabelDefinitions(rp.db, db.FilterIn("at_uri", labelAts))
548 if err != nil {
549 fail("Failed to unsubscribe to label.", err)
550 return
551 }
552
553 // update repo record to remove the label reference
554 newRepo := f.Repo
555 var updated []string
556 for _, l := range newRepo.Labels {
557 if !slices.Contains(labelAts, l) {
558 updated = append(updated, l)
559 }
560 }
561 newRepo.Labels = updated
562 repoRecord := newRepo.AsRecord()
563
564 client, err := rp.oauth.AuthorizedClient(r)
565 if err != nil {
566 fail(err.Error(), err)
567 return
568 }
569
570 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Repo.Did, f.Repo.Rkey)
571 if err != nil {
572 fail("Failed to update labels, no record found on PDS.", err)
573 return
574 }
575 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
576 Collection: tangled.RepoNSID,
577 Repo: newRepo.Did,
578 Rkey: newRepo.Rkey,
579 SwapRecord: ex.Cid,
580 Record: &lexutil.LexiconTypeDecoder{
581 Val: &repoRecord,
582 },
583 })
584
585 err = db.UnsubscribeLabel(
586 rp.db,
587 db.FilterEq("repo_at", f.RepoAt()),
588 db.FilterIn("label_at", labelAts),
589 )
590 if err != nil {
591 fail("Failed to unsubscribe label.", err)
592 return
593 }
594
595 // everything succeeded
596 rp.pages.HxRefresh(w)
597}
598
599func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) {
600 l := rp.logger.With("handler", "LabelPanel")
601
602 f, err := rp.repoResolver.Resolve(r)
603 if err != nil {
604 l.Error("failed to get repo and knot", "err", err)
605 return
606 }
607
608 subjectStr := r.FormValue("subject")
609 subject, err := syntax.ParseATURI(subjectStr)
610 if err != nil {
611 l.Error("failed to get repo and knot", "err", err)
612 return
613 }
614
615 labelDefs, err := db.GetLabelDefinitions(
616 rp.db,
617 db.FilterIn("at_uri", f.Repo.Labels),
618 db.FilterContains("scope", subject.Collection().String()),
619 )
620 if err != nil {
621 l.Error("failed to fetch label defs", "err", err)
622 return
623 }
624
625 defs := make(map[string]*models.LabelDefinition)
626 for _, l := range labelDefs {
627 defs[l.AtUri().String()] = &l
628 }
629
630 states, err := db.GetLabels(rp.db, db.FilterEq("subject", subject))
631 if err != nil {
632 l.Error("failed to build label state", "err", err)
633 return
634 }
635 state := states[subject]
636
637 user := rp.oauth.GetUser(r)
638 rp.pages.LabelPanel(w, pages.LabelPanelParams{
639 LoggedInUser: user,
640 RepoInfo: f.RepoInfo(user),
641 Defs: defs,
642 Subject: subject.String(),
643 State: state,
644 })
645}
646
647func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) {
648 l := rp.logger.With("handler", "EditLabelPanel")
649
650 f, err := rp.repoResolver.Resolve(r)
651 if err != nil {
652 l.Error("failed to get repo and knot", "err", err)
653 return
654 }
655
656 subjectStr := r.FormValue("subject")
657 subject, err := syntax.ParseATURI(subjectStr)
658 if err != nil {
659 l.Error("failed to get repo and knot", "err", err)
660 return
661 }
662
663 labelDefs, err := db.GetLabelDefinitions(
664 rp.db,
665 db.FilterIn("at_uri", f.Repo.Labels),
666 db.FilterContains("scope", subject.Collection().String()),
667 )
668 if err != nil {
669 l.Error("failed to fetch labels", "err", err)
670 return
671 }
672
673 defs := make(map[string]*models.LabelDefinition)
674 for _, l := range labelDefs {
675 defs[l.AtUri().String()] = &l
676 }
677
678 states, err := db.GetLabels(rp.db, db.FilterEq("subject", subject))
679 if err != nil {
680 l.Error("failed to build label state", "err", err)
681 return
682 }
683 state := states[subject]
684
685 user := rp.oauth.GetUser(r)
686 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{
687 LoggedInUser: user,
688 RepoInfo: f.RepoInfo(user),
689 Defs: defs,
690 Subject: subject.String(),
691 State: state,
692 })
693}
694
695func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) {
696 user := rp.oauth.GetUser(r)
697 l := rp.logger.With("handler", "AddCollaborator")
698 l = l.With("did", user.Did)
699
700 f, err := rp.repoResolver.Resolve(r)
701 if err != nil {
702 l.Error("failed to get repo and knot", "err", err)
703 return
704 }
705
706 errorId := "add-collaborator-error"
707 fail := func(msg string, err error) {
708 l.Error(msg, "err", err)
709 rp.pages.Notice(w, errorId, msg)
710 }
711
712 collaborator := r.FormValue("collaborator")
713 if collaborator == "" {
714 fail("Invalid form.", nil)
715 return
716 }
717
718 // remove a single leading `@`, to make @handle work with ResolveIdent
719 collaborator = strings.TrimPrefix(collaborator, "@")
720
721 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator)
722 if err != nil {
723 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err)
724 return
725 }
726
727 if collaboratorIdent.DID.String() == user.Did {
728 fail("You seem to be adding yourself as a collaborator.", nil)
729 return
730 }
731 l = l.With("collaborator", collaboratorIdent.Handle)
732 l = l.With("knot", f.Knot)
733
734 // announce this relation into the firehose, store into owners' pds
735 client, err := rp.oauth.AuthorizedClient(r)
736 if err != nil {
737 fail("Failed to write to PDS.", err)
738 return
739 }
740
741 // emit a record
742 currentUser := rp.oauth.GetUser(r)
743 rkey := tid.TID()
744 createdAt := time.Now()
745 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
746 Collection: tangled.RepoCollaboratorNSID,
747 Repo: currentUser.Did,
748 Rkey: rkey,
749 Record: &lexutil.LexiconTypeDecoder{
750 Val: &tangled.RepoCollaborator{
751 Subject: collaboratorIdent.DID.String(),
752 Repo: string(f.RepoAt()),
753 CreatedAt: createdAt.Format(time.RFC3339),
754 }},
755 })
756 // invalid record
757 if err != nil {
758 fail("Failed to write record to PDS.", err)
759 return
760 }
761
762 aturi := resp.Uri
763 l = l.With("at-uri", aturi)
764 l.Info("wrote record to PDS")
765
766 tx, err := rp.db.BeginTx(r.Context(), nil)
767 if err != nil {
768 fail("Failed to add collaborator.", err)
769 return
770 }
771
772 rollback := func() {
773 err1 := tx.Rollback()
774 err2 := rp.enforcer.E.LoadPolicy()
775 err3 := rollbackRecord(context.Background(), aturi, client)
776
777 // ignore txn complete errors, this is okay
778 if errors.Is(err1, sql.ErrTxDone) {
779 err1 = nil
780 }
781
782 if errs := errors.Join(err1, err2, err3); errs != nil {
783 l.Error("failed to rollback changes", "errs", errs)
784 return
785 }
786 }
787 defer rollback()
788
789 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.DidSlashRepo())
790 if err != nil {
791 fail("Failed to add collaborator permissions.", err)
792 return
793 }
794
795 err = db.AddCollaborator(tx, models.Collaborator{
796 Did: syntax.DID(currentUser.Did),
797 Rkey: rkey,
798 SubjectDid: collaboratorIdent.DID,
799 RepoAt: f.RepoAt(),
800 Created: createdAt,
801 })
802 if err != nil {
803 fail("Failed to add collaborator.", err)
804 return
805 }
806
807 err = tx.Commit()
808 if err != nil {
809 fail("Failed to add collaborator.", err)
810 return
811 }
812
813 err = rp.enforcer.E.SavePolicy()
814 if err != nil {
815 fail("Failed to update collaborator permissions.", err)
816 return
817 }
818
819 // clear aturi to when everything is successful
820 aturi = ""
821
822 rp.pages.HxRefresh(w)
823}
824
825func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) {
826 user := rp.oauth.GetUser(r)
827 l := rp.logger.With("handler", "DeleteRepo")
828
829 noticeId := "operation-error"
830 f, err := rp.repoResolver.Resolve(r)
831 if err != nil {
832 l.Error("failed to get repo and knot", "err", err)
833 return
834 }
835
836 // remove record from pds
837 atpClient, err := rp.oauth.AuthorizedClient(r)
838 if err != nil {
839 l.Error("failed to get authorized client", "err", err)
840 return
841 }
842 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{
843 Collection: tangled.RepoNSID,
844 Repo: user.Did,
845 Rkey: f.Rkey,
846 })
847 if err != nil {
848 l.Error("failed to delete record", "err", err)
849 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.")
850 return
851 }
852 l.Info("removed repo record", "aturi", f.RepoAt().String())
853
854 client, err := rp.oauth.ServiceClient(
855 r,
856 oauth.WithService(f.Knot),
857 oauth.WithLxm(tangled.RepoDeleteNSID),
858 oauth.WithDev(rp.config.Core.Dev),
859 )
860 if err != nil {
861 l.Error("failed to connect to knot server", "err", err)
862 return
863 }
864
865 err = tangled.RepoDelete(
866 r.Context(),
867 client,
868 &tangled.RepoDelete_Input{
869 Did: f.OwnerDid(),
870 Name: f.Name,
871 Rkey: f.Rkey,
872 },
873 )
874 if err := xrpcclient.HandleXrpcErr(err); err != nil {
875 rp.pages.Notice(w, noticeId, err.Error())
876 return
877 }
878 l.Info("deleted repo from knot")
879
880 tx, err := rp.db.BeginTx(r.Context(), nil)
881 if err != nil {
882 l.Error("failed to start tx")
883 w.Write(fmt.Append(nil, "failed to add collaborator: ", err))
884 return
885 }
886 defer func() {
887 tx.Rollback()
888 err = rp.enforcer.E.LoadPolicy()
889 if err != nil {
890 l.Error("failed to rollback policies")
891 }
892 }()
893
894 // remove collaborator RBAC
895 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.DidSlashRepo(), f.Knot)
896 if err != nil {
897 rp.pages.Notice(w, noticeId, "Failed to remove collaborators")
898 return
899 }
900 for _, c := range repoCollaborators {
901 did := c[0]
902 rp.enforcer.RemoveCollaborator(did, f.Knot, f.DidSlashRepo())
903 }
904 l.Info("removed collaborators")
905
906 // remove repo RBAC
907 err = rp.enforcer.RemoveRepo(f.OwnerDid(), f.Knot, f.DidSlashRepo())
908 if err != nil {
909 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules")
910 return
911 }
912
913 // remove repo from db
914 err = db.RemoveRepo(tx, f.OwnerDid(), f.Name)
915 if err != nil {
916 rp.pages.Notice(w, noticeId, "Failed to update appview")
917 return
918 }
919 l.Info("removed repo from db")
920
921 err = tx.Commit()
922 if err != nil {
923 l.Error("failed to commit changes", "err", err)
924 http.Error(w, err.Error(), http.StatusInternalServerError)
925 return
926 }
927
928 err = rp.enforcer.E.SavePolicy()
929 if err != nil {
930 l.Error("failed to update ACLs", "err", err)
931 http.Error(w, err.Error(), http.StatusInternalServerError)
932 return
933 }
934
935 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.OwnerDid()))
936}
937
938func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) {
939 l := rp.logger.With("handler", "SyncRepoFork")
940
941 ref := chi.URLParam(r, "ref")
942 ref, _ = url.PathUnescape(ref)
943
944 user := rp.oauth.GetUser(r)
945 f, err := rp.repoResolver.Resolve(r)
946 if err != nil {
947 l.Error("failed to resolve source repo", "err", err)
948 return
949 }
950
951 switch r.Method {
952 case http.MethodPost:
953 client, err := rp.oauth.ServiceClient(
954 r,
955 oauth.WithService(f.Knot),
956 oauth.WithLxm(tangled.RepoForkSyncNSID),
957 oauth.WithDev(rp.config.Core.Dev),
958 )
959 if err != nil {
960 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
961 return
962 }
963
964 repoInfo := f.RepoInfo(user)
965 if repoInfo.Source == nil {
966 rp.pages.Notice(w, "repo", "This repository is not a fork.")
967 return
968 }
969
970 err = tangled.RepoForkSync(
971 r.Context(),
972 client,
973 &tangled.RepoForkSync_Input{
974 Did: user.Did,
975 Name: f.Name,
976 Source: repoInfo.Source.RepoAt().String(),
977 Branch: ref,
978 },
979 )
980 if err := xrpcclient.HandleXrpcErr(err); err != nil {
981 rp.pages.Notice(w, "repo", err.Error())
982 return
983 }
984
985 rp.pages.HxRefresh(w)
986 return
987 }
988}
989
990func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) {
991 l := rp.logger.With("handler", "ForkRepo")
992
993 user := rp.oauth.GetUser(r)
994 f, err := rp.repoResolver.Resolve(r)
995 if err != nil {
996 l.Error("failed to resolve source repo", "err", err)
997 return
998 }
999
1000 switch r.Method {
1001 case http.MethodGet:
1002 user := rp.oauth.GetUser(r)
1003 knots, err := rp.enforcer.GetKnotsForUser(user.Did)
1004 if err != nil {
1005 rp.pages.Notice(w, "repo", "Invalid user account.")
1006 return
1007 }
1008
1009 rp.pages.ForkRepo(w, pages.ForkRepoParams{
1010 LoggedInUser: user,
1011 Knots: knots,
1012 RepoInfo: f.RepoInfo(user),
1013 })
1014
1015 case http.MethodPost:
1016 l := rp.logger.With("handler", "ForkRepo")
1017
1018 targetKnot := r.FormValue("knot")
1019 if targetKnot == "" {
1020 rp.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
1021 return
1022 }
1023 l = l.With("targetKnot", targetKnot)
1024
1025 ok, err := rp.enforcer.E.Enforce(user.Did, targetKnot, targetKnot, "repo:create")
1026 if err != nil || !ok {
1027 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
1028 return
1029 }
1030
1031 // choose a name for a fork
1032 forkName := r.FormValue("repo_name")
1033 if forkName == "" {
1034 rp.pages.Notice(w, "repo", "Repository name cannot be empty.")
1035 return
1036 }
1037
1038 // this check is *only* to see if the forked repo name already exists
1039 // in the user's account.
1040 existingRepo, err := db.GetRepo(
1041 rp.db,
1042 db.FilterEq("did", user.Did),
1043 db.FilterEq("name", forkName),
1044 )
1045 if err != nil {
1046 if !errors.Is(err, sql.ErrNoRows) {
1047 l.Error("error fetching existing repo from db", "err", err)
1048 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.")
1049 return
1050 }
1051 } else if existingRepo != nil {
1052 // repo with this name already exists
1053 rp.pages.Notice(w, "repo", "A repository with this name already exists.")
1054 return
1055 }
1056 l = l.With("forkName", forkName)
1057
1058 uri := "https"
1059 if rp.config.Core.Dev {
1060 uri = "http"
1061 }
1062
1063 forkSourceUrl := fmt.Sprintf("%s://%s/%s/%s", uri, f.Knot, f.OwnerDid(), f.Repo.Name)
1064 l = l.With("cloneUrl", forkSourceUrl)
1065
1066 sourceAt := f.RepoAt().String()
1067
1068 // create an atproto record for this fork
1069 rkey := tid.TID()
1070 repo := &models.Repo{
1071 Did: user.Did,
1072 Name: forkName,
1073 Knot: targetKnot,
1074 Rkey: rkey,
1075 Source: sourceAt,
1076 Description: f.Repo.Description,
1077 Created: time.Now(),
1078 Labels: rp.config.Label.DefaultLabelDefs,
1079 }
1080 record := repo.AsRecord()
1081
1082 atpClient, err := rp.oauth.AuthorizedClient(r)
1083 if err != nil {
1084 l.Error("failed to create xrpcclient", "err", err)
1085 rp.pages.Notice(w, "repo", "Failed to fork repository.")
1086 return
1087 }
1088
1089 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
1090 Collection: tangled.RepoNSID,
1091 Repo: user.Did,
1092 Rkey: rkey,
1093 Record: &lexutil.LexiconTypeDecoder{
1094 Val: &record,
1095 },
1096 })
1097 if err != nil {
1098 l.Error("failed to write to PDS", "err", err)
1099 rp.pages.Notice(w, "repo", "Failed to announce repository creation.")
1100 return
1101 }
1102
1103 aturi := atresp.Uri
1104 l = l.With("aturi", aturi)
1105 l.Info("wrote to PDS")
1106
1107 tx, err := rp.db.BeginTx(r.Context(), nil)
1108 if err != nil {
1109 l.Info("txn failed", "err", err)
1110 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1111 return
1112 }
1113
1114 // The rollback function reverts a few things on failure:
1115 // - the pending txn
1116 // - the ACLs
1117 // - the atproto record created
1118 rollback := func() {
1119 err1 := tx.Rollback()
1120 err2 := rp.enforcer.E.LoadPolicy()
1121 err3 := rollbackRecord(context.Background(), aturi, atpClient)
1122
1123 // ignore txn complete errors, this is okay
1124 if errors.Is(err1, sql.ErrTxDone) {
1125 err1 = nil
1126 }
1127
1128 if errs := errors.Join(err1, err2, err3); errs != nil {
1129 l.Error("failed to rollback changes", "errs", errs)
1130 return
1131 }
1132 }
1133 defer rollback()
1134
1135 client, err := rp.oauth.ServiceClient(
1136 r,
1137 oauth.WithService(targetKnot),
1138 oauth.WithLxm(tangled.RepoCreateNSID),
1139 oauth.WithDev(rp.config.Core.Dev),
1140 )
1141 if err != nil {
1142 l.Error("could not create service client", "err", err)
1143 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
1144 return
1145 }
1146
1147 err = tangled.RepoCreate(
1148 r.Context(),
1149 client,
1150 &tangled.RepoCreate_Input{
1151 Rkey: rkey,
1152 Source: &forkSourceUrl,
1153 },
1154 )
1155 if err := xrpcclient.HandleXrpcErr(err); err != nil {
1156 rp.pages.Notice(w, "repo", err.Error())
1157 return
1158 }
1159
1160 err = db.AddRepo(tx, repo)
1161 if err != nil {
1162 l.Error("failed to AddRepo", "err", err)
1163 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1164 return
1165 }
1166
1167 // acls
1168 p, _ := securejoin.SecureJoin(user.Did, forkName)
1169 err = rp.enforcer.AddRepo(user.Did, targetKnot, p)
1170 if err != nil {
1171 l.Error("failed to add ACLs", "err", err)
1172 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.")
1173 return
1174 }
1175
1176 err = tx.Commit()
1177 if err != nil {
1178 l.Error("failed to commit changes", "err", err)
1179 http.Error(w, err.Error(), http.StatusInternalServerError)
1180 return
1181 }
1182
1183 err = rp.enforcer.E.SavePolicy()
1184 if err != nil {
1185 l.Error("failed to update ACLs", "err", err)
1186 http.Error(w, err.Error(), http.StatusInternalServerError)
1187 return
1188 }
1189
1190 // reset the ATURI because the transaction completed successfully
1191 aturi = ""
1192
1193 rp.notifier.NewRepo(r.Context(), repo)
1194 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Did, forkName))
1195 }
1196}
1197
1198// this is used to rollback changes made to the PDS
1199//
1200// it is a no-op if the provided ATURI is empty
1201func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
1202 if aturi == "" {
1203 return nil
1204 }
1205
1206 parsed := syntax.ATURI(aturi)
1207
1208 collection := parsed.Collection().String()
1209 repo := parsed.Authority().String()
1210 rkey := parsed.RecordKey().String()
1211
1212 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
1213 Collection: collection,
1214 Repo: repo,
1215 Rkey: rkey,
1216 })
1217 return err
1218}