···
+
comatproto "github.com/bluesky-social/indigo/api/atproto"
+
"github.com/bluesky-social/indigo/atproto/syntax"
+
lexutil "github.com/bluesky-social/indigo/lex/util"
+
"github.com/go-chi/chi/v5"
+
"tangled.sh/tangled.sh/core/api/tangled"
+
"tangled.sh/tangled.sh/core/appview/config"
+
"tangled.sh/tangled.sh/core/appview/db"
+
"tangled.sh/tangled.sh/core/appview/middleware"
+
"tangled.sh/tangled.sh/core/appview/oauth"
+
"tangled.sh/tangled.sh/core/appview/pages"
+
"tangled.sh/tangled.sh/core/appview/reporesolver"
+
"tangled.sh/tangled.sh/core/appview/xrpcclient"
+
"tangled.sh/tangled.sh/core/eventconsumer"
+
"tangled.sh/tangled.sh/core/idresolver"
+
"tangled.sh/tangled.sh/core/log"
+
"tangled.sh/tangled.sh/core/rbac"
+
"tangled.sh/tangled.sh/core/tid"
+
repoResolver *reporesolver.RepoResolver
+
idResolver *idresolver.Resolver
+
repoResolver *reporesolver.RepoResolver,
+
spindlestream *eventconsumer.Consumer,
+
idResolver *idresolver.Resolver,
+
enforcer *rbac.Enforcer,
+
logger := log.New("labels")
+
repoResolver: repoResolver,
+
idResolver: idResolver,
+
func (l *Labels) Router(mw *middleware.Middleware) http.Handler {
+
r.With(middleware.AuthMiddleware(l.oauth)).Put("/perform", l.PerformLabelOp)
+
func (l *Labels) PerformLabelOp(w http.ResponseWriter, r *http.Request) {
+
user := l.oauth.GetUser(r)
+
if err := r.ParseForm(); err != nil {
+
l.logger.Error("failed to parse form data", "error", err)
+
http.Error(w, "Invalid form data", http.StatusBadRequest)
+
performedAt := time.Now()
+
indexedAt := time.Now()
+
repoAt := r.Form.Get("repo")
+
subjectUri := r.Form.Get("subject")
+
keys := r.Form["operand-key"]
+
vals := r.Form["operand-val"]
+
var labelOps []db.LabelOp
+
for i := range len(keys) {
+
op := r.FormValue(fmt.Sprintf("op-%d", i))
+
op = string(db.LabelOperationDel)
+
labelOps = append(labelOps, db.LabelOp{
+
Subject: syntax.ATURI(subjectUri),
+
Operation: db.LabelOperation(op),
+
PerformedAt: performedAt,
+
// TODO: validate the operations
+
// find all the labels that this repo subscribes to
+
repoLabels, err := db.GetRepoLabels(l.db, db.FilterEq("repo_at", repoAt))
+
http.Error(w, "Invalid form data", http.StatusBadRequest)
+
for _, rl := range repoLabels {
+
labelAts = append(labelAts, rl.LabelAt.String())
+
actx, err := db.NewLabelApplicationCtx(l.db, db.FilterIn("at_uri", labelAts))
+
http.Error(w, "Invalid form data", http.StatusBadRequest)
+
// calculate the start state by applying already known labels
+
existingOps, err := db.GetLabelOps(l.db, db.FilterEq("subject", subjectUri))
+
http.Error(w, "Invalid form data", http.StatusBadRequest)
+
labelState := db.NewLabelState()
+
actx.ApplyLabelOps(labelState, existingOps)
+
// next, apply all ops introduced in this request and filter out ones that are no-ops
+
validLabelOps := labelOps[:0]
+
for _, op := range labelOps {
+
if err = actx.ApplyLabelOp(labelState, op); err != db.LabelNoOpError {
+
validLabelOps = append(validLabelOps, op)
+
if len(validLabelOps) == 0 {
+
// create an atproto record of valid ops
+
record := db.LabelOpsAsRecord(validLabelOps)
+
client, err := l.oauth.AuthorizedClient(r)
+
l.logger.Error("failed to create client", "error", err)
+
http.Error(w, "Invalid form data", http.StatusBadRequest)
+
resp, err := client.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{
+
Collection: tangled.LabelOpNSID,
+
Record: &lexutil.LexiconTypeDecoder{
+
l.logger.Error("failed to write to PDS", "error", err)
+
http.Error(w, "failed to write to PDS", http.StatusInternalServerError)
+
tx, err := l.db.BeginTx(r.Context(), nil)
+
l.logger.Error("failed to start tx", "error", err)
+
err2 := rollbackRecord(context.Background(), atUri, client)
+
// ignore txn complete errors, this is okay
+
if errors.Is(err1, sql.ErrTxDone) {
+
if errs := errors.Join(err1, err2); errs != nil {
+
for _, o := range validLabelOps {
+
if _, err := db.AddLabelOp(l.db, &o); err != nil {
+
l.logger.Error("failed to add op", "err", err)
+
l.logger.Info("performed label op", "did", o.Did, "rkey", o.Rkey, "kind", o.Operation, "subjcet", o.Subject, "key", o.OperandKey)
+
// clear aturi when everything is successful
+
// this is used to rollback changes made to the PDS
+
// it is a no-op if the provided ATURI is empty
+
func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error {
+
parsed := syntax.ATURI(aturi)
+
collection := parsed.Collection().String()
+
repo := parsed.Authority().String()
+
rkey := parsed.RecordKey().String()
+
_, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{
+
Collection: collection,