forked from tangled.org/core
this repo has no description
at master 6.4 kB view raw
1package labels 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "time" 11 12 comatproto "github.com/bluesky-social/indigo/api/atproto" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 lexutil "github.com/bluesky-social/indigo/lex/util" 15 "github.com/go-chi/chi/v5" 16 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/appview/db" 19 "tangled.org/core/appview/middleware" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/oauth" 22 "tangled.org/core/appview/pages" 23 "tangled.org/core/appview/validator" 24 "tangled.org/core/appview/xrpcclient" 25 "tangled.org/core/log" 26 "tangled.org/core/rbac" 27 "tangled.org/core/tid" 28) 29 30type Labels struct { 31 oauth *oauth.OAuth 32 pages *pages.Pages 33 db *db.DB 34 logger *slog.Logger 35 validator *validator.Validator 36 enforcer *rbac.Enforcer 37} 38 39func New( 40 oauth *oauth.OAuth, 41 pages *pages.Pages, 42 db *db.DB, 43 validator *validator.Validator, 44 enforcer *rbac.Enforcer, 45) *Labels { 46 logger := log.New("labels") 47 48 return &Labels{ 49 oauth: oauth, 50 pages: pages, 51 db: db, 52 logger: logger, 53 validator: validator, 54 enforcer: enforcer, 55 } 56} 57 58func (l *Labels) Router(mw *middleware.Middleware) http.Handler { 59 r := chi.NewRouter() 60 61 r.Use(middleware.AuthMiddleware(l.oauth)) 62 r.Put("/perform", l.PerformLabelOp) 63 64 return r 65} 66 67// this is a tricky handler implementation: 68// - the user selects the new state of all the labels in the label panel and hits save 69// - this handler should calculate the diff in order to create the labelop record 70// - we need the diff in order to maintain a "history" of operations performed by users 71func (l *Labels) PerformLabelOp(w http.ResponseWriter, r *http.Request) { 72 user := l.oauth.GetUser(r) 73 74 noticeId := "add-label-error" 75 76 fail := func(msg string, err error) { 77 l.logger.Error("failed to add label", "err", err) 78 l.pages.Notice(w, noticeId, msg) 79 } 80 81 if err := r.ParseForm(); err != nil { 82 fail("Invalid form.", err) 83 return 84 } 85 86 did := user.Did 87 rkey := tid.TID() 88 performedAt := time.Now() 89 indexedAt := time.Now() 90 repoAt := r.Form.Get("repo") 91 subjectUri := r.Form.Get("subject") 92 93 repo, err := db.GetRepo(l.db, db.FilterEq("at_uri", repoAt)) 94 if err != nil { 95 fail("Failed to get repository.", err) 96 return 97 } 98 99 // find all the labels that this repo subscribes to 100 repoLabels, err := db.GetRepoLabels(l.db, db.FilterEq("repo_at", repoAt)) 101 if err != nil { 102 fail("Failed to get labels for this repository.", err) 103 return 104 } 105 106 var labelAts []string 107 for _, rl := range repoLabels { 108 labelAts = append(labelAts, rl.LabelAt.String()) 109 } 110 111 actx, err := db.NewLabelApplicationCtx(l.db, db.FilterIn("at_uri", labelAts)) 112 if err != nil { 113 fail("Invalid form data.", err) 114 return 115 } 116 117 // calculate the start state by applying already known labels 118 existingOps, err := db.GetLabelOps(l.db, db.FilterEq("subject", subjectUri)) 119 if err != nil { 120 fail("Invalid form data.", err) 121 return 122 } 123 124 labelState := models.NewLabelState() 125 actx.ApplyLabelOps(labelState, existingOps) 126 127 var labelOps []models.LabelOp 128 129 // first delete all existing state 130 for key, vals := range labelState.Inner() { 131 for val := range vals { 132 labelOps = append(labelOps, models.LabelOp{ 133 Did: did, 134 Rkey: rkey, 135 Subject: syntax.ATURI(subjectUri), 136 Operation: models.LabelOperationDel, 137 OperandKey: key, 138 OperandValue: val, 139 PerformedAt: performedAt, 140 IndexedAt: indexedAt, 141 }) 142 } 143 } 144 145 // add all the new state the user specified 146 for key, vals := range r.Form { 147 if _, ok := actx.Defs[key]; !ok { 148 continue 149 } 150 151 for _, val := range vals { 152 labelOps = append(labelOps, models.LabelOp{ 153 Did: did, 154 Rkey: rkey, 155 Subject: syntax.ATURI(subjectUri), 156 Operation: models.LabelOperationAdd, 157 OperandKey: key, 158 OperandValue: val, 159 PerformedAt: performedAt, 160 IndexedAt: indexedAt, 161 }) 162 } 163 } 164 165 for i := range labelOps { 166 def := actx.Defs[labelOps[i].OperandKey] 167 if err := l.validator.ValidateLabelOp(def, repo, &labelOps[i]); err != nil { 168 fail(fmt.Sprintf("Invalid form data: %s", err), err) 169 return 170 } 171 } 172 173 // reduce the opset 174 labelOps = models.ReduceLabelOps(labelOps) 175 176 // next, apply all ops introduced in this request and filter out ones that are no-ops 177 validLabelOps := labelOps[:0] 178 for _, op := range labelOps { 179 if err = actx.ApplyLabelOp(labelState, op); err != models.LabelNoOpError { 180 validLabelOps = append(validLabelOps, op) 181 } 182 } 183 184 // nothing to do 185 if len(validLabelOps) == 0 { 186 l.pages.HxRefresh(w) 187 return 188 } 189 190 // create an atproto record of valid ops 191 record := models.LabelOpsAsRecord(validLabelOps) 192 193 client, err := l.oauth.AuthorizedClient(r) 194 if err != nil { 195 fail("Failed to authorize user.", err) 196 return 197 } 198 199 resp, err := client.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{ 200 Collection: tangled.LabelOpNSID, 201 Repo: did, 202 Rkey: rkey, 203 Record: &lexutil.LexiconTypeDecoder{ 204 Val: &record, 205 }, 206 }) 207 if err != nil { 208 fail("Failed to create record on PDS for user.", err) 209 return 210 } 211 atUri := resp.Uri 212 213 tx, err := l.db.BeginTx(r.Context(), nil) 214 if err != nil { 215 fail("Failed to update labels. Try again later.", err) 216 return 217 } 218 219 rollback := func() { 220 err1 := tx.Rollback() 221 err2 := rollbackRecord(context.Background(), atUri, client) 222 223 // ignore txn complete errors, this is okay 224 if errors.Is(err1, sql.ErrTxDone) { 225 err1 = nil 226 } 227 228 if errs := errors.Join(err1, err2); errs != nil { 229 return 230 } 231 } 232 defer rollback() 233 234 for _, o := range validLabelOps { 235 if _, err := db.AddLabelOp(l.db, &o); err != nil { 236 fail("Failed to update labels. Try again later.", err) 237 return 238 } 239 } 240 241 err = tx.Commit() 242 if err != nil { 243 return 244 } 245 246 // clear aturi when everything is successful 247 atUri = "" 248 249 l.pages.HxRefresh(w) 250} 251 252// this is used to rollback changes made to the PDS 253// 254// it is a no-op if the provided ATURI is empty 255func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error { 256 if aturi == "" { 257 return nil 258 } 259 260 parsed := syntax.ATURI(aturi) 261 262 collection := parsed.Collection().String() 263 repo := parsed.Authority().String() 264 rkey := parsed.RecordKey().String() 265 266 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{ 267 Collection: collection, 268 Repo: repo, 269 Rkey: rkey, 270 }) 271 return err 272}