forked from tangled.org/core
this repo has no description

appview,knotserver: improve txn handling around repo creation and forking

Signed-off-by: oppiliappan <me@oppi.li>

oppi.li 08fa350b 737b1a7e

verified
Changed files
+207 -109
appview
pages
templates
repo
state
knotserver
xrpc
+8 -2
appview/pages/templates/repo/fork.html
···
<p class="text-xl font-bold dark:text-white">Fork {{ .RepoInfo.FullName }}</p>
</div>
<div class="p-6 bg-white dark:bg-gray-800 drop-shadow-sm rounded">
-
<form hx-post="/{{ .RepoInfo.FullName }}/fork" class="space-y-12" hx-swap="none">
+
<form hx-post="/{{ .RepoInfo.FullName }}/fork" class="space-y-12" hx-swap="none" hx-indicator="#spinner">
<fieldset class="space-y-3">
<legend class="dark:text-white">Select a knot to fork into</legend>
<div class="space-y-2">
···
</fieldset>
<div class="space-y-2">
-
<button type="submit" class="btn">fork repo</button>
+
<button type="submit" class="btn-create flex items-center gap-2">
+
{{ i "git-fork" "w-4 h-4" }}
+
fork repo
+
<span id="spinner" class="group">
+
{{ i "loader-circle" "w-4 h-4 animate-spin hidden group-[.htmx-request]:inline" }}
+
</span>
+
</button>
<div id="repo" class="error"></div>
</div>
</form>
+1 -1
appview/pages/templates/repo/new.html
···
<button type="submit" class="btn-create flex items-center gap-2">
{{ i "book-plus" "w-4 h-4" }}
create repo
-
<span id="create-pull-spinner" class="group">
+
<span id="spinner" class="group">
{{ i "loader-circle" "w-4 h-4 animate-spin hidden group-[.htmx-request]:inline" }}
</span>
</button>
+102 -64
appview/repo/repo.go
···
"tangled.sh/tangled.sh/core/appview/pages"
"tangled.sh/tangled.sh/core/appview/pages/markup"
"tangled.sh/tangled.sh/core/appview/reporesolver"
+
xrpcclient "tangled.sh/tangled.sh/core/appview/xrpcclient"
"tangled.sh/tangled.sh/core/eventconsumer"
"tangled.sh/tangled.sh/core/idresolver"
"tangled.sh/tangled.sh/core/knotclient"
···
})
case http.MethodPost:
+
l := rp.logger.With("handler", "ForkRepo")
-
knot := r.FormValue("knot")
-
if knot == "" {
+
targetKnot := r.FormValue("knot")
+
if targetKnot == "" {
rp.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.")
return
+
l = l.With("targetKnot", targetKnot)
-
ok, err := rp.enforcer.E.Enforce(user.Did, knot, knot, "repo:create")
+
ok, err := rp.enforcer.E.Enforce(user.Did, targetKnot, targetKnot, "repo:create")
if err != nil || !ok {
rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
return
-
forkName := fmt.Sprintf("%s", f.Name)
-
+
// choose a name for a fork
+
forkName := f.Name
// this check is *only* to see if the forked repo name already exists
// in the user's account.
existingRepo, err := db.GetRepo(rp.db, user.Did, f.Name)
···
// repo with this name already exists, append random string
forkName = fmt.Sprintf("%s-%s", forkName, randomString(3))
-
client, err := rp.oauth.ServiceClient(
-
r,
-
oauth.WithService(knot),
-
oauth.WithLxm(tangled.RepoForkNSID),
-
oauth.WithDev(rp.config.Core.Dev),
-
)
-
-
if err != nil {
-
log.Printf("error creating client for knot server: %v", err)
-
rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
-
return
-
}
+
l = l.With("forkName", forkName)
-
var uri string
+
uri := "https"
if rp.config.Core.Dev {
uri = "http"
-
} else {
-
uri = "https"
+
forkSourceUrl := fmt.Sprintf("%s://%s/%s/%s", uri, f.Knot, f.OwnerDid(), f.Repo.Name)
+
l = l.With("cloneUrl", forkSourceUrl)
+
sourceAt := f.RepoAt().String()
+
// create an atproto record for this fork
rkey := tid.TID()
repo := &db.Repo{
Did: user.Did,
Name: forkName,
-
Knot: knot,
+
Knot: targetKnot,
Rkey: rkey,
Source: sourceAt,
-
tx, err := rp.db.BeginTx(r.Context(), nil)
-
if err != nil {
-
log.Println(err)
-
rp.pages.Notice(w, "repo", "Failed to save repository information.")
-
return
-
}
-
defer func() {
-
tx.Rollback()
-
err = rp.enforcer.E.LoadPolicy()
-
if err != nil {
-
log.Println("failed to rollback policies")
-
}
-
}()
-
-
err = tangled.RepoFork(
-
r.Context(),
-
client,
-
&tangled.RepoFork_Input{
-
Did: user.Did,
-
Name: &forkName,
-
Source: forkSourceUrl,
-
},
-
)
-
-
if err != nil {
-
xe, err := xrpcerr.Unmarshal(err.Error())
-
if err != nil {
-
log.Println(err)
-
rp.pages.Notice(w, "repo", "Failed to create repository on knot server.")
-
return
-
}
-
-
log.Println(xe.Error())
-
rp.pages.Notice(w, "repo", fmt.Sprintf("Failed to create repository on knot server: %s.", xe.Message))
-
return
-
}
-
xrpcClient, err := rp.oauth.AuthorizedClient(r)
if err != nil {
-
log.Println("failed to get authorized client", err)
-
rp.pages.Notice(w, "repo", "Failed to create repository.")
+
l.Error("failed to create xrpcclient", "err", err)
+
rp.pages.Notice(w, "repo", "Failed to fork repository.")
return
···
}},
})
if err != nil {
-
log.Printf("failed to create record: %s", err)
+
l.Error("failed to write to PDS", "err", err)
rp.pages.Notice(w, "repo", "Failed to announce repository creation.")
return
-
log.Println("created repo record: ", atresp.Uri)
+
+
aturi := atresp.Uri
+
l = l.With("aturi", aturi)
+
l.Info("wrote to PDS")
+
+
tx, err := rp.db.BeginTx(r.Context(), nil)
+
if err != nil {
+
l.Info("txn failed", "err", err)
+
rp.pages.Notice(w, "repo", "Failed to save repository information.")
+
return
+
}
+
+
// The rollback function reverts a few things on failure:
+
// - the pending txn
+
// - the ACLs
+
// - the atproto record created
+
rollback := func() {
+
err1 := tx.Rollback()
+
err2 := rp.enforcer.E.LoadPolicy()
+
err3 := rollbackRecord(context.Background(), aturi, xrpcClient)
+
+
// ignore txn complete errors, this is okay
+
if errors.Is(err1, sql.ErrTxDone) {
+
err1 = nil
+
}
+
+
if errs := errors.Join(err1, err2, err3); errs != nil {
+
l.Error("failed to rollback changes", "errs", errs)
+
return
+
}
+
}
+
defer rollback()
+
+
client, err := rp.oauth.ServiceClient(
+
r,
+
oauth.WithService(targetKnot),
+
oauth.WithLxm(tangled.RepoCreateNSID),
+
oauth.WithDev(rp.config.Core.Dev),
+
)
+
if err != nil {
+
l.Error("could not create service client", "err", err)
+
rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
+
return
+
}
+
+
err = tangled.RepoCreate(
+
r.Context(),
+
client,
+
&tangled.RepoCreate_Input{
+
Rkey: rkey,
+
Source: &forkSourceUrl,
+
},
+
)
+
if err := xrpcclient.HandleXrpcErr(err); err != nil {
+
rp.pages.Notice(w, "repo", err.Error())
+
return
+
}
err = db.AddRepo(tx, repo)
if err != nil {
···
// acls
p, _ := securejoin.SecureJoin(user.Did, forkName)
-
err = rp.enforcer.AddRepo(user.Did, knot, p)
+
err = rp.enforcer.AddRepo(user.Did, targetKnot, p)
if err != nil {
log.Println(err)
rp.pages.Notice(w, "repo", "Failed to set up repository permissions.")
···
return
+
// reset the ATURI because the transaction completed successfully
+
aturi = ""
+
+
rp.notifier.NewRepo(r.Context(), repo)
rp.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, forkName))
-
return
+
}
+
}
+
+
// this is used to rollback changes made to the PDS
+
//
+
// it is a no-op if the provided ATURI is empty
+
func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error {
+
if aturi == "" {
+
return nil
+
+
parsed := syntax.ATURI(aturi)
+
+
collection := parsed.Collection().String()
+
repo := parsed.Authority().String()
+
rkey := parsed.RecordKey().String()
+
+
_, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{
+
Collection: collection,
+
Repo: repo,
+
Rkey: rkey,
+
})
+
return err
func (rp *Repo) RepoCompareNew(w http.ResponseWriter, r *http.Request) {
+88 -37
appview/state/state.go
···
import (
"context"
+
"database/sql"
+
"errors"
"fmt"
"log"
"log/slog"
···
"time"
comatproto "github.com/bluesky-social/indigo/api/atproto"
+
"github.com/bluesky-social/indigo/atproto/syntax"
lexutil "github.com/bluesky-social/indigo/lex/util"
securejoin "github.com/cyphar/filepath-securejoin"
"github.com/go-chi/chi/v5"
···
"tangled.sh/tangled.sh/core/appview/pages"
posthogService "tangled.sh/tangled.sh/core/appview/posthog"
"tangled.sh/tangled.sh/core/appview/reporesolver"
+
xrpcclient "tangled.sh/tangled.sh/core/appview/xrpcclient"
"tangled.sh/tangled.sh/core/eventconsumer"
"tangled.sh/tangled.sh/core/idresolver"
"tangled.sh/tangled.sh/core/jetstream"
···
repoResolver *reporesolver.RepoResolver
knotstream *eventconsumer.Consumer
spindlestream *eventconsumer.Consumer
+
logger *slog.Logger
}
func Make(ctx context.Context, config *config.Config) (*State, error) {
···
repoResolver,
knotstream,
spindlestream,
+
slog.Default(),
}
return state, nil
···
})
case http.MethodPost:
+
l := s.logger.With("handler", "NewRepo")
+
user := s.oauth.GetUser(r)
+
l = l.With("did", user.Did)
+
l = l.With("handle", user.Handle)
+
// form validation
domain := r.FormValue("domain")
if domain == "" {
s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.")
return
}
+
l = l.With("knot", domain)
repoName := r.FormValue("name")
if repoName == "" {
···
s.pages.Notice(w, "repo", err.Error())
return
}
-
repoName = stripGitExt(repoName)
+
l = l.With("repoName", repoName)
defaultBranch := r.FormValue("branch")
if defaultBranch == "" {
defaultBranch = "main"
}
+
l = l.With("defaultBranch", defaultBranch)
description := r.FormValue("description")
+
// ACL validation
ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
if err != nil || !ok {
+
l.Info("unauthorized")
s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
return
}
+
// Check for existing repos
existingRepo, err := db.GetRepo(s.db, user.Did, repoName)
if err == nil && existingRepo != nil {
l.Info("repo exists")
···
return
}
-
client, err := s.oauth.ServiceClient(
-
r,
-
oauth.WithService(domain),
-
oauth.WithLxm(tangled.RepoCreateNSID),
-
oauth.WithDev(s.config.Core.Dev),
-
)
-
-
if err != nil {
-
s.pages.Notice(w, "repo", "Failed to connect to knot server.")
-
return
-
}
-
+
// create atproto record for this repo
rkey := tid.TID()
repo := &db.Repo{
Did: user.Did,
···
xrpcClient, err := s.oauth.AuthorizedClient(r)
if err != nil {
+
l.Info("PDS write failed", "err", err)
s.pages.Notice(w, "repo", "Failed to write record to PDS.")
return
}
···
}},
})
if err != nil {
-
log.Printf("failed to create record: %s", err)
+
l.Info("PDS write failed", "err", err)
s.pages.Notice(w, "repo", "Failed to announce repository creation.")
return
}
-
log.Println("created repo record: ", atresp.Uri)
+
+
aturi := atresp.Uri
+
l = l.With("aturi", aturi)
+
l.Info("wrote to PDS")
tx, err := s.db.BeginTx(r.Context(), nil)
if err != nil {
-
log.Println(err)
+
l.Info("txn failed", "err", err)
s.pages.Notice(w, "repo", "Failed to save repository information.")
return
}
-
defer func() {
-
tx.Rollback()
-
err = s.enforcer.E.LoadPolicy()
-
if err != nil {
-
log.Println("failed to rollback policies")
+
+
// The rollback function reverts a few things on failure:
+
// - the pending txn
+
// - the ACLs
+
// - the atproto record created
+
rollback := func() {
+
err1 := tx.Rollback()
+
err2 := s.enforcer.E.LoadPolicy()
+
err3 := rollbackRecord(context.Background(), aturi, xrpcClient)
+
+
// ignore txn complete errors, this is okay
+
if errors.Is(err1, sql.ErrTxDone) {
+
err1 = nil
+
}
+
+
if errs := errors.Join(err1, err2, err3); errs != nil {
+
l.Error("failed to rollback changes", "errs", errs)
+
return
}
-
}()
+
}
+
defer rollback()
+
+
client, err := s.oauth.ServiceClient(
+
r,
+
oauth.WithService(domain),
+
oauth.WithLxm(tangled.RepoCreateNSID),
+
oauth.WithDev(s.config.Core.Dev),
+
)
+
if err != nil {
+
l.Error("service auth failed", "err", err)
+
s.pages.Notice(w, "repo", "Failed to reach PDS.")
+
return
+
}
xe := tangled.RepoCreate(
r.Context(),
···
},
)
if err != nil {
-
xe, err := xrpcerr.Unmarshal(err.Error())
-
if err != nil {
-
log.Println(err)
-
s.pages.Notice(w, "repo", "Failed to create repository on knot server.")
-
return
-
}
-
-
log.Println(xe.Error())
-
s.pages.Notice(w, "repo", fmt.Sprintf("Failed to create repository on knot server: %s.", xe.Message))
+
l.Error("xrpc request failed", "err", err)
+
s.pages.Notice(w, "repo", fmt.Sprintf("Failed to create repository on knot server: %s.", err.Error()))
return
}
err = db.AddRepo(tx, repo)
if err != nil {
-
log.Println(err)
+
l.Error("db write failed", "err", err)
s.pages.Notice(w, "repo", "Failed to save repository information.")
return
}
···
p, _ := securejoin.SecureJoin(user.Did, repoName)
err = s.enforcer.AddRepo(user.Did, domain, p)
if err != nil {
-
log.Println(err)
+
l.Error("acl setup failed", "err", err)
s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
return
}
err = tx.Commit()
if err != nil {
-
log.Println("failed to commit changes", err)
+
l.Error("txn commit failed", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
err = s.enforcer.E.SavePolicy()
if err != nil {
-
log.Println("failed to update ACLs", err)
+
l.Error("acl save failed", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
+
// reset the ATURI because the transaction completed successfully
+
aturi = ""
+
s.notifier.NewRepo(r.Context(), repo)
+
s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName))
+
}
+
}
-
s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName))
-
return
+
// this is used to rollback changes made to the PDS
+
//
+
// it is a no-op if the provided ATURI is empty
+
func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error {
+
if aturi == "" {
+
return nil
}
+
+
parsed := syntax.ATURI(aturi)
+
+
collection := parsed.Collection().String()
+
repo := parsed.Authority().String()
+
rkey := parsed.RecordKey().String()
+
+
_, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{
+
Collection: collection,
+
Repo: repo,
+
Rkey: rkey,
+
})
+
return err
}
+8 -5
knotserver/xrpc/router.go knotserver/xrpc/xrpc.go
···
func (x *Xrpc) Router() http.Handler {
r := chi.NewRouter()
+
r.Group(func(r chi.Router) {
r.Use(x.ServiceAuth.VerifyServiceAuth)
r.Post("/"+tangled.RepoSetDefaultBranchNSID, x.SetDefaultBranch)
r.Post("/"+tangled.RepoCreateNSID, x.CreateRepo)
-
r.Post("/"+tangled.RepoDeleteNSID, x.DeleteRepo)
-
r.Post("/"+tangled.RepoForkNSID, x.ForkRepo)
r.Post("/"+tangled.RepoForkStatusNSID, x.ForkStatus)
r.Post("/"+tangled.RepoForkSyncNSID, x.ForkSync)
-
r.Post("/"+tangled.RepoHiddenRefNSID, x.HiddenRef)
-
r.Post("/"+tangled.RepoMergeNSID, x.Merge)
-
r.Post("/"+tangled.RepoMergeCheckNSID, x.MergeCheck)
})
+
+
// merge check is an open endpoint
+
//
+
// TODO: should we constrain this more?
+
// - we can calculate on PR submit/resubmit/gitRefUpdate etc.
+
// - use ETags on clients to keep requests to a minimum
+
r.Post("/"+tangled.RepoMergeCheckNSID, x.MergeCheck)
return r
}