forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package labels 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "time" 11 12 comatproto "github.com/bluesky-social/indigo/api/atproto" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 lexutil "github.com/bluesky-social/indigo/lex/util" 15 "github.com/go-chi/chi/v5" 16 17 "tangled.sh/tangled.sh/core/api/tangled" 18 "tangled.sh/tangled.sh/core/appview/db" 19 "tangled.sh/tangled.sh/core/appview/middleware" 20 "tangled.sh/tangled.sh/core/appview/models" 21 "tangled.sh/tangled.sh/core/appview/oauth" 22 "tangled.sh/tangled.sh/core/appview/pages" 23 "tangled.sh/tangled.sh/core/appview/validator" 24 "tangled.sh/tangled.sh/core/appview/xrpcclient" 25 "tangled.sh/tangled.sh/core/log" 26 "tangled.sh/tangled.sh/core/tid" 27) 28 29type Labels struct { 30 oauth *oauth.OAuth 31 pages *pages.Pages 32 db *db.DB 33 logger *slog.Logger 34 validator *validator.Validator 35} 36 37func New( 38 oauth *oauth.OAuth, 39 pages *pages.Pages, 40 db *db.DB, 41 validator *validator.Validator, 42) *Labels { 43 logger := log.New("labels") 44 45 return &Labels{ 46 oauth: oauth, 47 pages: pages, 48 db: db, 49 logger: logger, 50 validator: validator, 51 } 52} 53 54func (l *Labels) Router(mw *middleware.Middleware) http.Handler { 55 r := chi.NewRouter() 56 57 r.Use(middleware.AuthMiddleware(l.oauth)) 58 r.Put("/perform", l.PerformLabelOp) 59 60 return r 61} 62 63// this is a tricky handler implementation: 64// - the user selects the new state of all the labels in the label panel and hits save 65// - this handler should calculate the diff in order to create the labelop record 66// - we need the diff in order to maintain a "history" of operations performed by users 67func (l *Labels) PerformLabelOp(w http.ResponseWriter, r *http.Request) { 68 user := l.oauth.GetUser(r) 69 70 noticeId := "add-label-error" 71 72 fail := func(msg string, err error) { 73 l.logger.Error("failed to add label", "err", err) 74 l.pages.Notice(w, noticeId, msg) 75 } 76 77 if err := r.ParseForm(); err != nil { 78 fail("Invalid form.", err) 79 return 80 } 81 82 did := user.Did 83 rkey := tid.TID() 84 performedAt := time.Now() 85 indexedAt := time.Now() 86 repoAt := r.Form.Get("repo") 87 subjectUri := r.Form.Get("subject") 88 89 // find all the labels that this repo subscribes to 90 repoLabels, err := db.GetRepoLabels(l.db, db.FilterEq("repo_at", repoAt)) 91 if err != nil { 92 fail("Failed to get labels for this repository.", err) 93 return 94 } 95 96 var labelAts []string 97 for _, rl := range repoLabels { 98 labelAts = append(labelAts, rl.LabelAt.String()) 99 } 100 101 actx, err := db.NewLabelApplicationCtx(l.db, db.FilterIn("at_uri", labelAts)) 102 if err != nil { 103 fail("Invalid form data.", err) 104 return 105 } 106 107 l.logger.Info("actx", "labels", labelAts) 108 l.logger.Info("actx", "defs", actx.Defs) 109 110 // calculate the start state by applying already known labels 111 existingOps, err := db.GetLabelOps(l.db, db.FilterEq("subject", subjectUri)) 112 if err != nil { 113 fail("Invalid form data.", err) 114 return 115 } 116 117 labelState := models.NewLabelState() 118 actx.ApplyLabelOps(labelState, existingOps) 119 120 var labelOps []models.LabelOp 121 122 // first delete all existing state 123 for key, vals := range labelState.Inner() { 124 for val := range vals { 125 labelOps = append(labelOps, models.LabelOp{ 126 Did: did, 127 Rkey: rkey, 128 Subject: syntax.ATURI(subjectUri), 129 Operation: models.LabelOperationDel, 130 OperandKey: key, 131 OperandValue: val, 132 PerformedAt: performedAt, 133 IndexedAt: indexedAt, 134 }) 135 } 136 } 137 138 // add all the new state the user specified 139 for key, vals := range r.Form { 140 if _, ok := actx.Defs[key]; !ok { 141 continue 142 } 143 144 for _, val := range vals { 145 labelOps = append(labelOps, models.LabelOp{ 146 Did: did, 147 Rkey: rkey, 148 Subject: syntax.ATURI(subjectUri), 149 Operation: models.LabelOperationAdd, 150 OperandKey: key, 151 OperandValue: val, 152 PerformedAt: performedAt, 153 IndexedAt: indexedAt, 154 }) 155 } 156 } 157 158 // reduce the opset 159 labelOps = models.ReduceLabelOps(labelOps) 160 161 for i := range labelOps { 162 def := actx.Defs[labelOps[i].OperandKey] 163 if err := l.validator.ValidateLabelOp(def, &labelOps[i]); err != nil { 164 fail(fmt.Sprintf("Invalid form data: %s", err), err) 165 return 166 } 167 } 168 169 // next, apply all ops introduced in this request and filter out ones that are no-ops 170 validLabelOps := labelOps[:0] 171 for _, op := range labelOps { 172 if err = actx.ApplyLabelOp(labelState, op); err != models.LabelNoOpError { 173 validLabelOps = append(validLabelOps, op) 174 } 175 } 176 177 // nothing to do 178 if len(validLabelOps) == 0 { 179 l.pages.HxRefresh(w) 180 return 181 } 182 183 // create an atproto record of valid ops 184 record := models.LabelOpsAsRecord(validLabelOps) 185 186 client, err := l.oauth.AuthorizedClient(r) 187 if err != nil { 188 fail("Failed to authorize user.", err) 189 return 190 } 191 192 resp, err := client.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{ 193 Collection: tangled.LabelOpNSID, 194 Repo: did, 195 Rkey: rkey, 196 Record: &lexutil.LexiconTypeDecoder{ 197 Val: &record, 198 }, 199 }) 200 if err != nil { 201 fail("Failed to create record on PDS for user.", err) 202 return 203 } 204 atUri := resp.Uri 205 206 tx, err := l.db.BeginTx(r.Context(), nil) 207 if err != nil { 208 fail("Failed to update labels. Try again later.", err) 209 return 210 } 211 212 rollback := func() { 213 err1 := tx.Rollback() 214 err2 := rollbackRecord(context.Background(), atUri, client) 215 216 // ignore txn complete errors, this is okay 217 if errors.Is(err1, sql.ErrTxDone) { 218 err1 = nil 219 } 220 221 if errs := errors.Join(err1, err2); errs != nil { 222 return 223 } 224 } 225 defer rollback() 226 227 for _, o := range validLabelOps { 228 if _, err := db.AddLabelOp(l.db, &o); err != nil { 229 fail("Failed to update labels. Try again later.", err) 230 return 231 } 232 } 233 234 err = tx.Commit() 235 if err != nil { 236 return 237 } 238 239 // clear aturi when everything is successful 240 atUri = "" 241 242 l.pages.HxRefresh(w) 243} 244 245// this is used to rollback changes made to the PDS 246// 247// it is a no-op if the provided ATURI is empty 248func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error { 249 if aturi == "" { 250 return nil 251 } 252 253 parsed := syntax.ATURI(aturi) 254 255 collection := parsed.Collection().String() 256 repo := parsed.Authority().String() 257 rkey := parsed.RecordKey().String() 258 259 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{ 260 Collection: collection, 261 Repo: repo, 262 Rkey: rkey, 263 }) 264 return err 265}