1package repo
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "net/url"
11 "slices"
12 "strings"
13 "time"
14
15 "tangled.org/core/api/tangled"
16 "tangled.org/core/appview/config"
17 "tangled.org/core/appview/db"
18 "tangled.org/core/appview/models"
19 "tangled.org/core/appview/notify"
20 "tangled.org/core/appview/oauth"
21 "tangled.org/core/appview/pages"
22 "tangled.org/core/appview/reporesolver"
23 "tangled.org/core/appview/validator"
24 xrpcclient "tangled.org/core/appview/xrpcclient"
25 "tangled.org/core/eventconsumer"
26 "tangled.org/core/idresolver"
27 "tangled.org/core/rbac"
28 "tangled.org/core/tid"
29 "tangled.org/core/xrpc/serviceauth"
30
31 comatproto "github.com/bluesky-social/indigo/api/atproto"
32 atpclient "github.com/bluesky-social/indigo/atproto/client"
33 "github.com/bluesky-social/indigo/atproto/syntax"
34 lexutil "github.com/bluesky-social/indigo/lex/util"
35 securejoin "github.com/cyphar/filepath-securejoin"
36 "github.com/go-chi/chi/v5"
37)
38
39type Repo struct {
40 repoResolver *reporesolver.RepoResolver
41 idResolver *idresolver.Resolver
42 config *config.Config
43 oauth *oauth.OAuth
44 pages *pages.Pages
45 spindlestream *eventconsumer.Consumer
46 db *db.DB
47 enforcer *rbac.Enforcer
48 notifier notify.Notifier
49 logger *slog.Logger
50 serviceAuth *serviceauth.ServiceAuth
51 validator *validator.Validator
52}
53
54func New(
55 oauth *oauth.OAuth,
56 repoResolver *reporesolver.RepoResolver,
57 pages *pages.Pages,
58 spindlestream *eventconsumer.Consumer,
59 idResolver *idresolver.Resolver,
60 db *db.DB,
61 config *config.Config,
62 notifier notify.Notifier,
63 enforcer *rbac.Enforcer,
64 logger *slog.Logger,
65 validator *validator.Validator,
66) *Repo {
67 return &Repo{oauth: oauth,
68 repoResolver: repoResolver,
69 pages: pages,
70 idResolver: idResolver,
71 config: config,
72 spindlestream: spindlestream,
73 db: db,
74 notifier: notifier,
75 enforcer: enforcer,
76 logger: logger,
77 validator: validator,
78 }
79}
80
81// modify the spindle configured for this repo
82func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) {
83 user := rp.oauth.GetUser(r)
84 l := rp.logger.With("handler", "EditSpindle")
85 l = l.With("did", user.Did)
86
87 errorId := "operation-error"
88 fail := func(msg string, err error) {
89 l.Error(msg, "err", err)
90 rp.pages.Notice(w, errorId, msg)
91 }
92
93 f, err := rp.repoResolver.Resolve(r)
94 if err != nil {
95 fail("Failed to resolve repo. Try again later", err)
96 return
97 }
98
99 newSpindle := r.FormValue("spindle")
100 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value
101 client, err := rp.oauth.AuthorizedClient(r)
102 if err != nil {
103 fail("Failed to authorize. Try again later.", err)
104 return
105 }
106
107 if !removingSpindle {
108 // ensure that this is a valid spindle for this user
109 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Did)
110 if err != nil {
111 fail("Failed to find spindles. Try again later.", err)
112 return
113 }
114
115 if !slices.Contains(validSpindles, newSpindle) {
116 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles))
117 return
118 }
119 }
120
121 newRepo := f.Repo
122 newRepo.Spindle = newSpindle
123 record := newRepo.AsRecord()
124
125 spindlePtr := &newSpindle
126 if removingSpindle {
127 spindlePtr = nil
128 newRepo.Spindle = ""
129 }
130
131 // optimistic update
132 err = db.UpdateSpindle(rp.db, newRepo.RepoAt().String(), spindlePtr)
133 if err != nil {
134 fail("Failed to update spindle. Try again later.", err)
135 return
136 }
137
138 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
139 if err != nil {
140 fail("Failed to update spindle, no record found on PDS.", err)
141 return
142 }
143 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
144 Collection: tangled.RepoNSID,
145 Repo: newRepo.Did,
146 Rkey: newRepo.Rkey,
147 SwapRecord: ex.Cid,
148 Record: &lexutil.LexiconTypeDecoder{
149 Val: &record,
150 },
151 })
152
153 if err != nil {
154 fail("Failed to update spindle, unable to save to PDS.", err)
155 return
156 }
157
158 if !removingSpindle {
159 // add this spindle to spindle stream
160 rp.spindlestream.AddSource(
161 context.Background(),
162 eventconsumer.NewSpindleSource(newSpindle),
163 )
164 }
165
166 rp.pages.HxRefresh(w)
167}
168
169func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) {
170 user := rp.oauth.GetUser(r)
171 l := rp.logger.With("handler", "AddLabel")
172 l = l.With("did", user.Did)
173
174 f, err := rp.repoResolver.Resolve(r)
175 if err != nil {
176 l.Error("failed to get repo and knot", "err", err)
177 return
178 }
179
180 errorId := "add-label-error"
181 fail := func(msg string, err error) {
182 l.Error(msg, "err", err)
183 rp.pages.Notice(w, errorId, msg)
184 }
185
186 // get form values for label definition
187 name := r.FormValue("name")
188 concreteType := r.FormValue("valueType")
189 valueFormat := r.FormValue("valueFormat")
190 enumValues := r.FormValue("enumValues")
191 scope := r.Form["scope"]
192 color := r.FormValue("color")
193 multiple := r.FormValue("multiple") == "true"
194
195 var variants []string
196 for part := range strings.SplitSeq(enumValues, ",") {
197 if part = strings.TrimSpace(part); part != "" {
198 variants = append(variants, part)
199 }
200 }
201
202 if concreteType == "" {
203 concreteType = "null"
204 }
205
206 format := models.ValueTypeFormatAny
207 if valueFormat == "did" {
208 format = models.ValueTypeFormatDid
209 }
210
211 valueType := models.ValueType{
212 Type: models.ConcreteType(concreteType),
213 Format: format,
214 Enum: variants,
215 }
216
217 label := models.LabelDefinition{
218 Did: user.Did,
219 Rkey: tid.TID(),
220 Name: name,
221 ValueType: valueType,
222 Scope: scope,
223 Color: &color,
224 Multiple: multiple,
225 Created: time.Now(),
226 }
227 if err := rp.validator.ValidateLabelDefinition(&label); err != nil {
228 fail(err.Error(), err)
229 return
230 }
231
232 // announce this relation into the firehose, store into owners' pds
233 client, err := rp.oauth.AuthorizedClient(r)
234 if err != nil {
235 fail(err.Error(), err)
236 return
237 }
238
239 // emit a labelRecord
240 labelRecord := label.AsRecord()
241 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
242 Collection: tangled.LabelDefinitionNSID,
243 Repo: label.Did,
244 Rkey: label.Rkey,
245 Record: &lexutil.LexiconTypeDecoder{
246 Val: &labelRecord,
247 },
248 })
249 // invalid record
250 if err != nil {
251 fail("Failed to write record to PDS.", err)
252 return
253 }
254
255 aturi := resp.Uri
256 l = l.With("at-uri", aturi)
257 l.Info("wrote label record to PDS")
258
259 // update the repo to subscribe to this label
260 newRepo := f.Repo
261 newRepo.Labels = append(newRepo.Labels, aturi)
262 repoRecord := newRepo.AsRecord()
263
264 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
265 if err != nil {
266 fail("Failed to update labels, no record found on PDS.", err)
267 return
268 }
269 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
270 Collection: tangled.RepoNSID,
271 Repo: newRepo.Did,
272 Rkey: newRepo.Rkey,
273 SwapRecord: ex.Cid,
274 Record: &lexutil.LexiconTypeDecoder{
275 Val: &repoRecord,
276 },
277 })
278 if err != nil {
279 fail("Failed to update labels for repo.", err)
280 return
281 }
282
283 tx, err := rp.db.BeginTx(r.Context(), nil)
284 if err != nil {
285 fail("Failed to add label.", err)
286 return
287 }
288
289 rollback := func() {
290 err1 := tx.Rollback()
291 err2 := rollbackRecord(context.Background(), aturi, client)
292
293 // ignore txn complete errors, this is okay
294 if errors.Is(err1, sql.ErrTxDone) {
295 err1 = nil
296 }
297
298 if errs := errors.Join(err1, err2); errs != nil {
299 l.Error("failed to rollback changes", "errs", errs)
300 return
301 }
302 }
303 defer rollback()
304
305 _, err = db.AddLabelDefinition(tx, &label)
306 if err != nil {
307 fail("Failed to add label.", err)
308 return
309 }
310
311 err = db.SubscribeLabel(tx, &models.RepoLabel{
312 RepoAt: f.RepoAt(),
313 LabelAt: label.AtUri(),
314 })
315
316 err = tx.Commit()
317 if err != nil {
318 fail("Failed to add label.", err)
319 return
320 }
321
322 // clear aturi when everything is successful
323 aturi = ""
324
325 rp.pages.HxRefresh(w)
326}
327
328func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) {
329 user := rp.oauth.GetUser(r)
330 l := rp.logger.With("handler", "DeleteLabel")
331 l = l.With("did", user.Did)
332
333 f, err := rp.repoResolver.Resolve(r)
334 if err != nil {
335 l.Error("failed to get repo and knot", "err", err)
336 return
337 }
338
339 errorId := "label-operation"
340 fail := func(msg string, err error) {
341 l.Error(msg, "err", err)
342 rp.pages.Notice(w, errorId, msg)
343 }
344
345 // get form values
346 labelId := r.FormValue("label-id")
347
348 label, err := db.GetLabelDefinition(rp.db, db.FilterEq("id", labelId))
349 if err != nil {
350 fail("Failed to find label definition.", err)
351 return
352 }
353
354 client, err := rp.oauth.AuthorizedClient(r)
355 if err != nil {
356 fail(err.Error(), err)
357 return
358 }
359
360 // delete label record from PDS
361 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{
362 Collection: tangled.LabelDefinitionNSID,
363 Repo: label.Did,
364 Rkey: label.Rkey,
365 })
366 if err != nil {
367 fail("Failed to delete label record from PDS.", err)
368 return
369 }
370
371 // update repo record to remove the label reference
372 newRepo := f.Repo
373 var updated []string
374 removedAt := label.AtUri().String()
375 for _, l := range newRepo.Labels {
376 if l != removedAt {
377 updated = append(updated, l)
378 }
379 }
380 newRepo.Labels = updated
381 repoRecord := newRepo.AsRecord()
382
383 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
384 if err != nil {
385 fail("Failed to update labels, no record found on PDS.", err)
386 return
387 }
388 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
389 Collection: tangled.RepoNSID,
390 Repo: newRepo.Did,
391 Rkey: newRepo.Rkey,
392 SwapRecord: ex.Cid,
393 Record: &lexutil.LexiconTypeDecoder{
394 Val: &repoRecord,
395 },
396 })
397 if err != nil {
398 fail("Failed to update repo record.", err)
399 return
400 }
401
402 // transaction for DB changes
403 tx, err := rp.db.BeginTx(r.Context(), nil)
404 if err != nil {
405 fail("Failed to delete label.", err)
406 return
407 }
408 defer tx.Rollback()
409
410 err = db.UnsubscribeLabel(
411 tx,
412 db.FilterEq("repo_at", f.RepoAt()),
413 db.FilterEq("label_at", removedAt),
414 )
415 if err != nil {
416 fail("Failed to unsubscribe label.", err)
417 return
418 }
419
420 err = db.DeleteLabelDefinition(tx, db.FilterEq("id", label.Id))
421 if err != nil {
422 fail("Failed to delete label definition.", err)
423 return
424 }
425
426 err = tx.Commit()
427 if err != nil {
428 fail("Failed to delete label.", err)
429 return
430 }
431
432 // everything succeeded
433 rp.pages.HxRefresh(w)
434}
435
436func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) {
437 user := rp.oauth.GetUser(r)
438 l := rp.logger.With("handler", "SubscribeLabel")
439 l = l.With("did", user.Did)
440
441 f, err := rp.repoResolver.Resolve(r)
442 if err != nil {
443 l.Error("failed to get repo and knot", "err", err)
444 return
445 }
446
447 if err := r.ParseForm(); err != nil {
448 l.Error("invalid form", "err", err)
449 return
450 }
451
452 errorId := "default-label-operation"
453 fail := func(msg string, err error) {
454 l.Error(msg, "err", err)
455 rp.pages.Notice(w, errorId, msg)
456 }
457
458 labelAts := r.Form["label"]
459 _, err = db.GetLabelDefinitions(rp.db, db.FilterIn("at_uri", labelAts))
460 if err != nil {
461 fail("Failed to subscribe to label.", err)
462 return
463 }
464
465 newRepo := f.Repo
466 newRepo.Labels = append(newRepo.Labels, labelAts...)
467
468 // dedup
469 slices.Sort(newRepo.Labels)
470 newRepo.Labels = slices.Compact(newRepo.Labels)
471
472 repoRecord := newRepo.AsRecord()
473
474 client, err := rp.oauth.AuthorizedClient(r)
475 if err != nil {
476 fail(err.Error(), err)
477 return
478 }
479
480 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Repo.Did, f.Repo.Rkey)
481 if err != nil {
482 fail("Failed to update labels, no record found on PDS.", err)
483 return
484 }
485 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
486 Collection: tangled.RepoNSID,
487 Repo: newRepo.Did,
488 Rkey: newRepo.Rkey,
489 SwapRecord: ex.Cid,
490 Record: &lexutil.LexiconTypeDecoder{
491 Val: &repoRecord,
492 },
493 })
494
495 tx, err := rp.db.Begin()
496 if err != nil {
497 fail("Failed to subscribe to label.", err)
498 return
499 }
500 defer tx.Rollback()
501
502 for _, l := range labelAts {
503 err = db.SubscribeLabel(tx, &models.RepoLabel{
504 RepoAt: f.RepoAt(),
505 LabelAt: syntax.ATURI(l),
506 })
507 if err != nil {
508 fail("Failed to subscribe to label.", err)
509 return
510 }
511 }
512
513 if err := tx.Commit(); err != nil {
514 fail("Failed to subscribe to label.", err)
515 return
516 }
517
518 // everything succeeded
519 rp.pages.HxRefresh(w)
520}
521
522func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) {
523 user := rp.oauth.GetUser(r)
524 l := rp.logger.With("handler", "UnsubscribeLabel")
525 l = l.With("did", user.Did)
526
527 f, err := rp.repoResolver.Resolve(r)
528 if err != nil {
529 l.Error("failed to get repo and knot", "err", err)
530 return
531 }
532
533 if err := r.ParseForm(); err != nil {
534 l.Error("invalid form", "err", err)
535 return
536 }
537
538 errorId := "default-label-operation"
539 fail := func(msg string, err error) {
540 l.Error(msg, "err", err)
541 rp.pages.Notice(w, errorId, msg)
542 }
543
544 labelAts := r.Form["label"]
545 _, err = db.GetLabelDefinitions(rp.db, db.FilterIn("at_uri", labelAts))
546 if err != nil {
547 fail("Failed to unsubscribe to label.", err)
548 return
549 }
550
551 // update repo record to remove the label reference
552 newRepo := f.Repo
553 var updated []string
554 for _, l := range newRepo.Labels {
555 if !slices.Contains(labelAts, l) {
556 updated = append(updated, l)
557 }
558 }
559 newRepo.Labels = updated
560 repoRecord := newRepo.AsRecord()
561
562 client, err := rp.oauth.AuthorizedClient(r)
563 if err != nil {
564 fail(err.Error(), err)
565 return
566 }
567
568 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Repo.Did, f.Repo.Rkey)
569 if err != nil {
570 fail("Failed to update labels, no record found on PDS.", err)
571 return
572 }
573 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
574 Collection: tangled.RepoNSID,
575 Repo: newRepo.Did,
576 Rkey: newRepo.Rkey,
577 SwapRecord: ex.Cid,
578 Record: &lexutil.LexiconTypeDecoder{
579 Val: &repoRecord,
580 },
581 })
582
583 err = db.UnsubscribeLabel(
584 rp.db,
585 db.FilterEq("repo_at", f.RepoAt()),
586 db.FilterIn("label_at", labelAts),
587 )
588 if err != nil {
589 fail("Failed to unsubscribe label.", err)
590 return
591 }
592
593 // everything succeeded
594 rp.pages.HxRefresh(w)
595}
596
597func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) {
598 l := rp.logger.With("handler", "LabelPanel")
599
600 f, err := rp.repoResolver.Resolve(r)
601 if err != nil {
602 l.Error("failed to get repo and knot", "err", err)
603 return
604 }
605
606 subjectStr := r.FormValue("subject")
607 subject, err := syntax.ParseATURI(subjectStr)
608 if err != nil {
609 l.Error("failed to get repo and knot", "err", err)
610 return
611 }
612
613 labelDefs, err := db.GetLabelDefinitions(
614 rp.db,
615 db.FilterIn("at_uri", f.Repo.Labels),
616 db.FilterContains("scope", subject.Collection().String()),
617 )
618 if err != nil {
619 l.Error("failed to fetch label defs", "err", err)
620 return
621 }
622
623 defs := make(map[string]*models.LabelDefinition)
624 for _, l := range labelDefs {
625 defs[l.AtUri().String()] = &l
626 }
627
628 states, err := db.GetLabels(rp.db, db.FilterEq("subject", subject))
629 if err != nil {
630 l.Error("failed to build label state", "err", err)
631 return
632 }
633 state := states[subject]
634
635 user := rp.oauth.GetUser(r)
636 rp.pages.LabelPanel(w, pages.LabelPanelParams{
637 LoggedInUser: user,
638 RepoInfo: f.RepoInfo(user),
639 Defs: defs,
640 Subject: subject.String(),
641 State: state,
642 })
643}
644
645func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) {
646 l := rp.logger.With("handler", "EditLabelPanel")
647
648 f, err := rp.repoResolver.Resolve(r)
649 if err != nil {
650 l.Error("failed to get repo and knot", "err", err)
651 return
652 }
653
654 subjectStr := r.FormValue("subject")
655 subject, err := syntax.ParseATURI(subjectStr)
656 if err != nil {
657 l.Error("failed to get repo and knot", "err", err)
658 return
659 }
660
661 labelDefs, err := db.GetLabelDefinitions(
662 rp.db,
663 db.FilterIn("at_uri", f.Repo.Labels),
664 db.FilterContains("scope", subject.Collection().String()),
665 )
666 if err != nil {
667 l.Error("failed to fetch labels", "err", err)
668 return
669 }
670
671 defs := make(map[string]*models.LabelDefinition)
672 for _, l := range labelDefs {
673 defs[l.AtUri().String()] = &l
674 }
675
676 states, err := db.GetLabels(rp.db, db.FilterEq("subject", subject))
677 if err != nil {
678 l.Error("failed to build label state", "err", err)
679 return
680 }
681 state := states[subject]
682
683 user := rp.oauth.GetUser(r)
684 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{
685 LoggedInUser: user,
686 RepoInfo: f.RepoInfo(user),
687 Defs: defs,
688 Subject: subject.String(),
689 State: state,
690 })
691}
692
693func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) {
694 user := rp.oauth.GetUser(r)
695 l := rp.logger.With("handler", "AddCollaborator")
696 l = l.With("did", user.Did)
697
698 f, err := rp.repoResolver.Resolve(r)
699 if err != nil {
700 l.Error("failed to get repo and knot", "err", err)
701 return
702 }
703
704 errorId := "add-collaborator-error"
705 fail := func(msg string, err error) {
706 l.Error(msg, "err", err)
707 rp.pages.Notice(w, errorId, msg)
708 }
709
710 collaborator := r.FormValue("collaborator")
711 if collaborator == "" {
712 fail("Invalid form.", nil)
713 return
714 }
715
716 // remove a single leading `@`, to make @handle work with ResolveIdent
717 collaborator = strings.TrimPrefix(collaborator, "@")
718
719 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator)
720 if err != nil {
721 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err)
722 return
723 }
724
725 if collaboratorIdent.DID.String() == user.Did {
726 fail("You seem to be adding yourself as a collaborator.", nil)
727 return
728 }
729 l = l.With("collaborator", collaboratorIdent.Handle)
730 l = l.With("knot", f.Knot)
731
732 // announce this relation into the firehose, store into owners' pds
733 client, err := rp.oauth.AuthorizedClient(r)
734 if err != nil {
735 fail("Failed to write to PDS.", err)
736 return
737 }
738
739 // emit a record
740 currentUser := rp.oauth.GetUser(r)
741 rkey := tid.TID()
742 createdAt := time.Now()
743 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
744 Collection: tangled.RepoCollaboratorNSID,
745 Repo: currentUser.Did,
746 Rkey: rkey,
747 Record: &lexutil.LexiconTypeDecoder{
748 Val: &tangled.RepoCollaborator{
749 Subject: collaboratorIdent.DID.String(),
750 Repo: string(f.RepoAt()),
751 CreatedAt: createdAt.Format(time.RFC3339),
752 }},
753 })
754 // invalid record
755 if err != nil {
756 fail("Failed to write record to PDS.", err)
757 return
758 }
759
760 aturi := resp.Uri
761 l = l.With("at-uri", aturi)
762 l.Info("wrote record to PDS")
763
764 tx, err := rp.db.BeginTx(r.Context(), nil)
765 if err != nil {
766 fail("Failed to add collaborator.", err)
767 return
768 }
769
770 rollback := func() {
771 err1 := tx.Rollback()
772 err2 := rp.enforcer.E.LoadPolicy()
773 err3 := rollbackRecord(context.Background(), aturi, client)
774
775 // ignore txn complete errors, this is okay
776 if errors.Is(err1, sql.ErrTxDone) {
777 err1 = nil
778 }
779
780 if errs := errors.Join(err1, err2, err3); errs != nil {
781 l.Error("failed to rollback changes", "errs", errs)
782 return
783 }
784 }
785 defer rollback()
786
787 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.DidSlashRepo())
788 if err != nil {
789 fail("Failed to add collaborator permissions.", err)
790 return
791 }
792
793 err = db.AddCollaborator(tx, models.Collaborator{
794 Did: syntax.DID(currentUser.Did),
795 Rkey: rkey,
796 SubjectDid: collaboratorIdent.DID,
797 RepoAt: f.RepoAt(),
798 Created: createdAt,
799 })
800 if err != nil {
801 fail("Failed to add collaborator.", err)
802 return
803 }
804
805 err = tx.Commit()
806 if err != nil {
807 fail("Failed to add collaborator.", err)
808 return
809 }
810
811 err = rp.enforcer.E.SavePolicy()
812 if err != nil {
813 fail("Failed to update collaborator permissions.", err)
814 return
815 }
816
817 // clear aturi to when everything is successful
818 aturi = ""
819
820 rp.pages.HxRefresh(w)
821}
822
823func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) {
824 user := rp.oauth.GetUser(r)
825 l := rp.logger.With("handler", "DeleteRepo")
826
827 noticeId := "operation-error"
828 f, err := rp.repoResolver.Resolve(r)
829 if err != nil {
830 l.Error("failed to get repo and knot", "err", err)
831 return
832 }
833
834 // remove record from pds
835 atpClient, err := rp.oauth.AuthorizedClient(r)
836 if err != nil {
837 l.Error("failed to get authorized client", "err", err)
838 return
839 }
840 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{
841 Collection: tangled.RepoNSID,
842 Repo: user.Did,
843 Rkey: f.Rkey,
844 })
845 if err != nil {
846 l.Error("failed to delete record", "err", err)
847 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.")
848 return
849 }
850 l.Info("removed repo record", "aturi", f.RepoAt().String())
851
852 client, err := rp.oauth.ServiceClient(
853 r,
854 oauth.WithService(f.Knot),
855 oauth.WithLxm(tangled.RepoDeleteNSID),
856 oauth.WithDev(rp.config.Core.Dev),
857 )
858 if err != nil {
859 l.Error("failed to connect to knot server", "err", err)
860 return
861 }
862
863 err = tangled.RepoDelete(
864 r.Context(),
865 client,
866 &tangled.RepoDelete_Input{
867 Did: f.OwnerDid(),
868 Name: f.Name,
869 Rkey: f.Rkey,
870 },
871 )
872 if err := xrpcclient.HandleXrpcErr(err); err != nil {
873 rp.pages.Notice(w, noticeId, err.Error())
874 return
875 }
876 l.Info("deleted repo from knot")
877
878 tx, err := rp.db.BeginTx(r.Context(), nil)
879 if err != nil {
880 l.Error("failed to start tx")
881 w.Write(fmt.Append(nil, "failed to add collaborator: ", err))
882 return
883 }
884 defer func() {
885 tx.Rollback()
886 err = rp.enforcer.E.LoadPolicy()
887 if err != nil {
888 l.Error("failed to rollback policies")
889 }
890 }()
891
892 // remove collaborator RBAC
893 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.DidSlashRepo(), f.Knot)
894 if err != nil {
895 rp.pages.Notice(w, noticeId, "Failed to remove collaborators")
896 return
897 }
898 for _, c := range repoCollaborators {
899 did := c[0]
900 rp.enforcer.RemoveCollaborator(did, f.Knot, f.DidSlashRepo())
901 }
902 l.Info("removed collaborators")
903
904 // remove repo RBAC
905 err = rp.enforcer.RemoveRepo(f.OwnerDid(), f.Knot, f.DidSlashRepo())
906 if err != nil {
907 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules")
908 return
909 }
910
911 // remove repo from db
912 err = db.RemoveRepo(tx, f.OwnerDid(), f.Name)
913 if err != nil {
914 rp.pages.Notice(w, noticeId, "Failed to update appview")
915 return
916 }
917 l.Info("removed repo from db")
918
919 err = tx.Commit()
920 if err != nil {
921 l.Error("failed to commit changes", "err", err)
922 http.Error(w, err.Error(), http.StatusInternalServerError)
923 return
924 }
925
926 err = rp.enforcer.E.SavePolicy()
927 if err != nil {
928 l.Error("failed to update ACLs", "err", err)
929 http.Error(w, err.Error(), http.StatusInternalServerError)
930 return
931 }
932
933 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.OwnerDid()))
934}
935
936func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) {
937 l := rp.logger.With("handler", "SyncRepoFork")
938
939 ref := chi.URLParam(r, "ref")
940 ref, _ = url.PathUnescape(ref)
941
942 user := rp.oauth.GetUser(r)
943 f, err := rp.repoResolver.Resolve(r)
944 if err != nil {
945 l.Error("failed to resolve source repo", "err", err)
946 return
947 }
948
949 switch r.Method {
950 case http.MethodPost:
951 client, err := rp.oauth.ServiceClient(
952 r,
953 oauth.WithService(f.Knot),
954 oauth.WithLxm(tangled.RepoForkSyncNSID),
955 oauth.WithDev(rp.config.Core.Dev),
956 )
957 if err != nil {
958 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
959 return
960 }
961
962 repoInfo := f.RepoInfo(user)
963 if repoInfo.Source == nil {
964 rp.pages.Notice(w, "repo", "This repository is not a fork.")
965 return
966 }
967
968 err = tangled.RepoForkSync(
969 r.Context(),
970 client,
971 &tangled.RepoForkSync_Input{
972 Did: user.Did,
973 Name: f.Name,
974 Source: repoInfo.Source.RepoAt().String(),
975 Branch: ref,
976 },
977 )
978 if err := xrpcclient.HandleXrpcErr(err); err != nil {
979 rp.pages.Notice(w, "repo", err.Error())
980 return
981 }
982
983 rp.pages.HxRefresh(w)
984 return
985 }
986}
987
988func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) {
989 l := rp.logger.With("handler", "ForkRepo")
990
991 user := rp.oauth.GetUser(r)
992 f, err := rp.repoResolver.Resolve(r)
993 if err != nil {
994 l.Error("failed to resolve source repo", "err", err)
995 return
996 }
997
998 switch r.Method {
999 case http.MethodGet:
1000 user := rp.oauth.GetUser(r)
1001 knots, err := rp.enforcer.GetKnotsForUser(user.Did)
1002 if err != nil {
1003 rp.pages.Notice(w, "repo", "Invalid user account.")
1004 return
1005 }
1006
1007 rp.pages.ForkRepo(w, pages.ForkRepoParams{
1008 LoggedInUser: user,
1009 Knots: knots,
1010 RepoInfo: f.RepoInfo(user),
1011 })
1012
1013 case http.MethodPost:
1014 l := rp.logger.With("handler", "ForkRepo")
1015
1016 targetKnot := r.FormValue("knot")
1017 if targetKnot == "" {
1018 rp.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
1019 return
1020 }
1021 l = l.With("targetKnot", targetKnot)
1022
1023 ok, err := rp.enforcer.E.Enforce(user.Did, targetKnot, targetKnot, "repo:create")
1024 if err != nil || !ok {
1025 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
1026 return
1027 }
1028
1029 // choose a name for a fork
1030 forkName := r.FormValue("repo_name")
1031 if forkName == "" {
1032 rp.pages.Notice(w, "repo", "Repository name cannot be empty.")
1033 return
1034 }
1035
1036 // this check is *only* to see if the forked repo name already exists
1037 // in the user's account.
1038 existingRepo, err := db.GetRepo(
1039 rp.db,
1040 db.FilterEq("did", user.Did),
1041 db.FilterEq("name", forkName),
1042 )
1043 if err != nil {
1044 if !errors.Is(err, sql.ErrNoRows) {
1045 l.Error("error fetching existing repo from db", "err", err)
1046 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.")
1047 return
1048 }
1049 } else if existingRepo != nil {
1050 // repo with this name already exists
1051 rp.pages.Notice(w, "repo", "A repository with this name already exists.")
1052 return
1053 }
1054 l = l.With("forkName", forkName)
1055
1056 uri := "https"
1057 if rp.config.Core.Dev {
1058 uri = "http"
1059 }
1060
1061 forkSourceUrl := fmt.Sprintf("%s://%s/%s/%s", uri, f.Knot, f.OwnerDid(), f.Repo.Name)
1062 l = l.With("cloneUrl", forkSourceUrl)
1063
1064 sourceAt := f.RepoAt().String()
1065
1066 // create an atproto record for this fork
1067 rkey := tid.TID()
1068 repo := &models.Repo{
1069 Did: user.Did,
1070 Name: forkName,
1071 Knot: targetKnot,
1072 Rkey: rkey,
1073 Source: sourceAt,
1074 Description: f.Repo.Description,
1075 Created: time.Now(),
1076 Labels: rp.config.Label.DefaultLabelDefs,
1077 }
1078 record := repo.AsRecord()
1079
1080 atpClient, err := rp.oauth.AuthorizedClient(r)
1081 if err != nil {
1082 l.Error("failed to create xrpcclient", "err", err)
1083 rp.pages.Notice(w, "repo", "Failed to fork repository.")
1084 return
1085 }
1086
1087 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
1088 Collection: tangled.RepoNSID,
1089 Repo: user.Did,
1090 Rkey: rkey,
1091 Record: &lexutil.LexiconTypeDecoder{
1092 Val: &record,
1093 },
1094 })
1095 if err != nil {
1096 l.Error("failed to write to PDS", "err", err)
1097 rp.pages.Notice(w, "repo", "Failed to announce repository creation.")
1098 return
1099 }
1100
1101 aturi := atresp.Uri
1102 l = l.With("aturi", aturi)
1103 l.Info("wrote to PDS")
1104
1105 tx, err := rp.db.BeginTx(r.Context(), nil)
1106 if err != nil {
1107 l.Info("txn failed", "err", err)
1108 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1109 return
1110 }
1111
1112 // The rollback function reverts a few things on failure:
1113 // - the pending txn
1114 // - the ACLs
1115 // - the atproto record created
1116 rollback := func() {
1117 err1 := tx.Rollback()
1118 err2 := rp.enforcer.E.LoadPolicy()
1119 err3 := rollbackRecord(context.Background(), aturi, atpClient)
1120
1121 // ignore txn complete errors, this is okay
1122 if errors.Is(err1, sql.ErrTxDone) {
1123 err1 = nil
1124 }
1125
1126 if errs := errors.Join(err1, err2, err3); errs != nil {
1127 l.Error("failed to rollback changes", "errs", errs)
1128 return
1129 }
1130 }
1131 defer rollback()
1132
1133 client, err := rp.oauth.ServiceClient(
1134 r,
1135 oauth.WithService(targetKnot),
1136 oauth.WithLxm(tangled.RepoCreateNSID),
1137 oauth.WithDev(rp.config.Core.Dev),
1138 )
1139 if err != nil {
1140 l.Error("could not create service client", "err", err)
1141 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
1142 return
1143 }
1144
1145 err = tangled.RepoCreate(
1146 r.Context(),
1147 client,
1148 &tangled.RepoCreate_Input{
1149 Rkey: rkey,
1150 Source: &forkSourceUrl,
1151 },
1152 )
1153 if err := xrpcclient.HandleXrpcErr(err); err != nil {
1154 rp.pages.Notice(w, "repo", err.Error())
1155 return
1156 }
1157
1158 err = db.AddRepo(tx, repo)
1159 if err != nil {
1160 l.Error("failed to AddRepo", "err", err)
1161 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1162 return
1163 }
1164
1165 // acls
1166 p, _ := securejoin.SecureJoin(user.Did, forkName)
1167 err = rp.enforcer.AddRepo(user.Did, targetKnot, p)
1168 if err != nil {
1169 l.Error("failed to add ACLs", "err", err)
1170 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.")
1171 return
1172 }
1173
1174 err = tx.Commit()
1175 if err != nil {
1176 l.Error("failed to commit changes", "err", err)
1177 http.Error(w, err.Error(), http.StatusInternalServerError)
1178 return
1179 }
1180
1181 err = rp.enforcer.E.SavePolicy()
1182 if err != nil {
1183 l.Error("failed to update ACLs", "err", err)
1184 http.Error(w, err.Error(), http.StatusInternalServerError)
1185 return
1186 }
1187
1188 // reset the ATURI because the transaction completed successfully
1189 aturi = ""
1190
1191 rp.notifier.NewRepo(r.Context(), repo)
1192 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Did, forkName))
1193 }
1194}
1195
1196// this is used to rollback changes made to the PDS
1197//
1198// it is a no-op if the provided ATURI is empty
1199func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
1200 if aturi == "" {
1201 return nil
1202 }
1203
1204 parsed := syntax.ATURI(aturi)
1205
1206 collection := parsed.Collection().String()
1207 repo := parsed.Authority().String()
1208 rkey := parsed.RecordKey().String()
1209
1210 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
1211 Collection: collection,
1212 Repo: repo,
1213 Rkey: rkey,
1214 })
1215 return err
1216}