forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
at master 6.4 kB view raw
1package labels 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "time" 11 12 "tangled.org/core/api/tangled" 13 "tangled.org/core/appview/db" 14 "tangled.org/core/appview/middleware" 15 "tangled.org/core/appview/models" 16 "tangled.org/core/appview/oauth" 17 "tangled.org/core/appview/pages" 18 "tangled.org/core/appview/validator" 19 "tangled.org/core/orm" 20 "tangled.org/core/rbac" 21 "tangled.org/core/tid" 22 23 comatproto "github.com/bluesky-social/indigo/api/atproto" 24 atpclient "github.com/bluesky-social/indigo/atproto/client" 25 "github.com/bluesky-social/indigo/atproto/syntax" 26 lexutil "github.com/bluesky-social/indigo/lex/util" 27 "github.com/go-chi/chi/v5" 28) 29 30type Labels struct { 31 oauth *oauth.OAuth 32 pages *pages.Pages 33 db *db.DB 34 logger *slog.Logger 35 validator *validator.Validator 36 enforcer *rbac.Enforcer 37} 38 39func New( 40 oauth *oauth.OAuth, 41 pages *pages.Pages, 42 db *db.DB, 43 validator *validator.Validator, 44 enforcer *rbac.Enforcer, 45 logger *slog.Logger, 46) *Labels { 47 return &Labels{ 48 oauth: oauth, 49 pages: pages, 50 db: db, 51 logger: logger, 52 validator: validator, 53 enforcer: enforcer, 54 } 55} 56 57func (l *Labels) Router() http.Handler { 58 r := chi.NewRouter() 59 60 r.Use(middleware.AuthMiddleware(l.oauth)) 61 r.Put("/perform", l.PerformLabelOp) 62 63 return r 64} 65 66// this is a tricky handler implementation: 67// - the user selects the new state of all the labels in the label panel and hits save 68// - this handler should calculate the diff in order to create the labelop record 69// - we need the diff in order to maintain a "history" of operations performed by users 70func (l *Labels) PerformLabelOp(w http.ResponseWriter, r *http.Request) { 71 user := l.oauth.GetUser(r) 72 73 noticeId := "add-label-error" 74 75 fail := func(msg string, err error) { 76 l.logger.Error("failed to add label", "err", err) 77 l.pages.Notice(w, noticeId, msg) 78 } 79 80 if err := r.ParseForm(); err != nil { 81 fail("Invalid form.", err) 82 return 83 } 84 85 did := user.Did 86 rkey := tid.TID() 87 performedAt := time.Now() 88 indexedAt := time.Now() 89 repoAt := r.Form.Get("repo") 90 subjectUri := r.Form.Get("subject") 91 92 repo, err := db.GetRepo(l.db, orm.FilterEq("at_uri", repoAt)) 93 if err != nil { 94 fail("Failed to get repository.", err) 95 return 96 } 97 98 // find all the labels that this repo subscribes to 99 repoLabels, err := db.GetRepoLabels(l.db, orm.FilterEq("repo_at", repoAt)) 100 if err != nil { 101 fail("Failed to get labels for this repository.", err) 102 return 103 } 104 105 var labelAts []string 106 for _, rl := range repoLabels { 107 labelAts = append(labelAts, rl.LabelAt.String()) 108 } 109 110 actx, err := db.NewLabelApplicationCtx(l.db, orm.FilterIn("at_uri", labelAts)) 111 if err != nil { 112 fail("Invalid form data.", err) 113 return 114 } 115 116 // calculate the start state by applying already known labels 117 existingOps, err := db.GetLabelOps(l.db, orm.FilterEq("subject", subjectUri)) 118 if err != nil { 119 fail("Invalid form data.", err) 120 return 121 } 122 123 labelState := models.NewLabelState() 124 actx.ApplyLabelOps(labelState, existingOps) 125 126 var labelOps []models.LabelOp 127 128 // first delete all existing state 129 for key, vals := range labelState.Inner() { 130 for val := range vals { 131 labelOps = append(labelOps, models.LabelOp{ 132 Did: did, 133 Rkey: rkey, 134 Subject: syntax.ATURI(subjectUri), 135 Operation: models.LabelOperationDel, 136 OperandKey: key, 137 OperandValue: val, 138 PerformedAt: performedAt, 139 IndexedAt: indexedAt, 140 }) 141 } 142 } 143 144 // add all the new state the user specified 145 for key, vals := range r.Form { 146 if _, ok := actx.Defs[key]; !ok { 147 continue 148 } 149 150 for _, val := range vals { 151 labelOps = append(labelOps, models.LabelOp{ 152 Did: did, 153 Rkey: rkey, 154 Subject: syntax.ATURI(subjectUri), 155 Operation: models.LabelOperationAdd, 156 OperandKey: key, 157 OperandValue: val, 158 PerformedAt: performedAt, 159 IndexedAt: indexedAt, 160 }) 161 } 162 } 163 164 for i := range labelOps { 165 def := actx.Defs[labelOps[i].OperandKey] 166 if err := l.validator.ValidateLabelOp(def, repo, &labelOps[i]); err != nil { 167 fail(fmt.Sprintf("Invalid form data: %s", err), err) 168 return 169 } 170 } 171 172 // reduce the opset 173 labelOps = models.ReduceLabelOps(labelOps) 174 175 // next, apply all ops introduced in this request and filter out ones that are no-ops 176 validLabelOps := labelOps[:0] 177 for _, op := range labelOps { 178 if err = actx.ApplyLabelOp(labelState, op); err != models.LabelNoOpError { 179 validLabelOps = append(validLabelOps, op) 180 } 181 } 182 183 // nothing to do 184 if len(validLabelOps) == 0 { 185 l.pages.HxRefresh(w) 186 return 187 } 188 189 // create an atproto record of valid ops 190 record := models.LabelOpsAsRecord(validLabelOps) 191 192 client, err := l.oauth.AuthorizedClient(r) 193 if err != nil { 194 fail("Failed to authorize user.", err) 195 return 196 } 197 198 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 199 Collection: tangled.LabelOpNSID, 200 Repo: did, 201 Rkey: rkey, 202 Record: &lexutil.LexiconTypeDecoder{ 203 Val: &record, 204 }, 205 }) 206 if err != nil { 207 fail("Failed to create record on PDS for user.", err) 208 return 209 } 210 atUri := resp.Uri 211 212 tx, err := l.db.BeginTx(r.Context(), nil) 213 if err != nil { 214 fail("Failed to update labels. Try again later.", err) 215 return 216 } 217 218 rollback := func() { 219 err1 := tx.Rollback() 220 err2 := rollbackRecord(context.Background(), atUri, client) 221 222 // ignore txn complete errors, this is okay 223 if errors.Is(err1, sql.ErrTxDone) { 224 err1 = nil 225 } 226 227 if errs := errors.Join(err1, err2); errs != nil { 228 return 229 } 230 } 231 defer rollback() 232 233 for _, o := range validLabelOps { 234 if _, err := db.AddLabelOp(l.db, &o); err != nil { 235 fail("Failed to update labels. Try again later.", err) 236 return 237 } 238 } 239 240 err = tx.Commit() 241 if err != nil { 242 return 243 } 244 245 // clear aturi when everything is successful 246 atUri = "" 247 248 l.pages.HxRefresh(w) 249} 250 251// this is used to rollback changes made to the PDS 252// 253// it is a no-op if the provided ATURI is empty 254func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 255 if aturi == "" { 256 return nil 257 } 258 259 parsed := syntax.ATURI(aturi) 260 261 collection := parsed.Collection().String() 262 repo := parsed.Authority().String() 263 rkey := parsed.RecordKey().String() 264 265 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 266 Collection: collection, 267 Repo: repo, 268 Rkey: rkey, 269 }) 270 return err 271}