forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
at master 6.4 kB view raw
1package labels 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "time" 11 12 "tangled.org/core/api/tangled" 13 "tangled.org/core/appview/db" 14 "tangled.org/core/appview/middleware" 15 "tangled.org/core/appview/models" 16 "tangled.org/core/appview/oauth" 17 "tangled.org/core/appview/pages" 18 "tangled.org/core/appview/validator" 19 "tangled.org/core/rbac" 20 "tangled.org/core/tid" 21 22 comatproto "github.com/bluesky-social/indigo/api/atproto" 23 atpclient "github.com/bluesky-social/indigo/atproto/client" 24 "github.com/bluesky-social/indigo/atproto/syntax" 25 lexutil "github.com/bluesky-social/indigo/lex/util" 26 "github.com/go-chi/chi/v5" 27) 28 29type Labels struct { 30 oauth *oauth.OAuth 31 pages *pages.Pages 32 db *db.DB 33 logger *slog.Logger 34 validator *validator.Validator 35 enforcer *rbac.Enforcer 36} 37 38func New( 39 oauth *oauth.OAuth, 40 pages *pages.Pages, 41 db *db.DB, 42 validator *validator.Validator, 43 enforcer *rbac.Enforcer, 44 logger *slog.Logger, 45) *Labels { 46 return &Labels{ 47 oauth: oauth, 48 pages: pages, 49 db: db, 50 logger: logger, 51 validator: validator, 52 enforcer: enforcer, 53 } 54} 55 56func (l *Labels) Router() http.Handler { 57 r := chi.NewRouter() 58 59 r.Use(middleware.AuthMiddleware(l.oauth)) 60 r.Put("/perform", l.PerformLabelOp) 61 62 return r 63} 64 65// this is a tricky handler implementation: 66// - the user selects the new state of all the labels in the label panel and hits save 67// - this handler should calculate the diff in order to create the labelop record 68// - we need the diff in order to maintain a "history" of operations performed by users 69func (l *Labels) PerformLabelOp(w http.ResponseWriter, r *http.Request) { 70 user := l.oauth.GetUser(r) 71 72 noticeId := "add-label-error" 73 74 fail := func(msg string, err error) { 75 l.logger.Error("failed to add label", "err", err) 76 l.pages.Notice(w, noticeId, msg) 77 } 78 79 if err := r.ParseForm(); err != nil { 80 fail("Invalid form.", err) 81 return 82 } 83 84 did := user.Did 85 rkey := tid.TID() 86 performedAt := time.Now() 87 indexedAt := time.Now() 88 repoAt := r.Form.Get("repo") 89 subjectUri := r.Form.Get("subject") 90 91 repo, err := db.GetRepo(l.db, db.FilterEq("at_uri", repoAt)) 92 if err != nil { 93 fail("Failed to get repository.", err) 94 return 95 } 96 97 // find all the labels that this repo subscribes to 98 repoLabels, err := db.GetRepoLabels(l.db, db.FilterEq("repo_at", repoAt)) 99 if err != nil { 100 fail("Failed to get labels for this repository.", err) 101 return 102 } 103 104 var labelAts []string 105 for _, rl := range repoLabels { 106 labelAts = append(labelAts, rl.LabelAt.String()) 107 } 108 109 actx, err := db.NewLabelApplicationCtx(l.db, db.FilterIn("at_uri", labelAts)) 110 if err != nil { 111 fail("Invalid form data.", err) 112 return 113 } 114 115 // calculate the start state by applying already known labels 116 existingOps, err := db.GetLabelOps(l.db, db.FilterEq("subject", subjectUri)) 117 if err != nil { 118 fail("Invalid form data.", err) 119 return 120 } 121 122 labelState := models.NewLabelState() 123 actx.ApplyLabelOps(labelState, existingOps) 124 125 var labelOps []models.LabelOp 126 127 // first delete all existing state 128 for key, vals := range labelState.Inner() { 129 for val := range vals { 130 labelOps = append(labelOps, models.LabelOp{ 131 Did: did, 132 Rkey: rkey, 133 Subject: syntax.ATURI(subjectUri), 134 Operation: models.LabelOperationDel, 135 OperandKey: key, 136 OperandValue: val, 137 PerformedAt: performedAt, 138 IndexedAt: indexedAt, 139 }) 140 } 141 } 142 143 // add all the new state the user specified 144 for key, vals := range r.Form { 145 if _, ok := actx.Defs[key]; !ok { 146 continue 147 } 148 149 for _, val := range vals { 150 labelOps = append(labelOps, models.LabelOp{ 151 Did: did, 152 Rkey: rkey, 153 Subject: syntax.ATURI(subjectUri), 154 Operation: models.LabelOperationAdd, 155 OperandKey: key, 156 OperandValue: val, 157 PerformedAt: performedAt, 158 IndexedAt: indexedAt, 159 }) 160 } 161 } 162 163 for i := range labelOps { 164 def := actx.Defs[labelOps[i].OperandKey] 165 if err := l.validator.ValidateLabelOp(def, repo, &labelOps[i]); err != nil { 166 fail(fmt.Sprintf("Invalid form data: %s", err), err) 167 return 168 } 169 } 170 171 // reduce the opset 172 labelOps = models.ReduceLabelOps(labelOps) 173 174 // next, apply all ops introduced in this request and filter out ones that are no-ops 175 validLabelOps := labelOps[:0] 176 for _, op := range labelOps { 177 if err = actx.ApplyLabelOp(labelState, op); err != models.LabelNoOpError { 178 validLabelOps = append(validLabelOps, op) 179 } 180 } 181 182 // nothing to do 183 if len(validLabelOps) == 0 { 184 l.pages.HxRefresh(w) 185 return 186 } 187 188 // create an atproto record of valid ops 189 record := models.LabelOpsAsRecord(validLabelOps) 190 191 client, err := l.oauth.AuthorizedClient(r) 192 if err != nil { 193 fail("Failed to authorize user.", err) 194 return 195 } 196 197 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 198 Collection: tangled.LabelOpNSID, 199 Repo: did, 200 Rkey: rkey, 201 Record: &lexutil.LexiconTypeDecoder{ 202 Val: &record, 203 }, 204 }) 205 if err != nil { 206 fail("Failed to create record on PDS for user.", err) 207 return 208 } 209 atUri := resp.Uri 210 211 tx, err := l.db.BeginTx(r.Context(), nil) 212 if err != nil { 213 fail("Failed to update labels. Try again later.", err) 214 return 215 } 216 217 rollback := func() { 218 err1 := tx.Rollback() 219 err2 := rollbackRecord(context.Background(), atUri, client) 220 221 // ignore txn complete errors, this is okay 222 if errors.Is(err1, sql.ErrTxDone) { 223 err1 = nil 224 } 225 226 if errs := errors.Join(err1, err2); errs != nil { 227 return 228 } 229 } 230 defer rollback() 231 232 for _, o := range validLabelOps { 233 if _, err := db.AddLabelOp(l.db, &o); err != nil { 234 fail("Failed to update labels. Try again later.", err) 235 return 236 } 237 } 238 239 err = tx.Commit() 240 if err != nil { 241 return 242 } 243 244 // clear aturi when everything is successful 245 atUri = "" 246 247 l.pages.HxRefresh(w) 248} 249 250// this is used to rollback changes made to the PDS 251// 252// it is a no-op if the provided ATURI is empty 253func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 254 if aturi == "" { 255 return nil 256 } 257 258 parsed := syntax.ATURI(aturi) 259 260 collection := parsed.Collection().String() 261 repo := parsed.Authority().String() 262 rkey := parsed.RecordKey().String() 263 264 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 265 Collection: collection, 266 Repo: repo, 267 Rkey: rkey, 268 }) 269 return err 270}