forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package labels 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "time" 11 12 comatproto "github.com/bluesky-social/indigo/api/atproto" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 lexutil "github.com/bluesky-social/indigo/lex/util" 15 "github.com/go-chi/chi/v5" 16 17 "tangled.sh/tangled.sh/core/api/tangled" 18 "tangled.sh/tangled.sh/core/appview/db" 19 "tangled.sh/tangled.sh/core/appview/middleware" 20 "tangled.sh/tangled.sh/core/appview/oauth" 21 "tangled.sh/tangled.sh/core/appview/pages" 22 "tangled.sh/tangled.sh/core/appview/validator" 23 "tangled.sh/tangled.sh/core/appview/xrpcclient" 24 "tangled.sh/tangled.sh/core/log" 25 "tangled.sh/tangled.sh/core/tid" 26) 27 28type Labels struct { 29 oauth *oauth.OAuth 30 pages *pages.Pages 31 db *db.DB 32 logger *slog.Logger 33 validator *validator.Validator 34} 35 36func New( 37 oauth *oauth.OAuth, 38 pages *pages.Pages, 39 db *db.DB, 40 validator *validator.Validator, 41) *Labels { 42 logger := log.New("labels") 43 44 return &Labels{ 45 oauth: oauth, 46 pages: pages, 47 db: db, 48 logger: logger, 49 validator: validator, 50 } 51} 52 53func (l *Labels) Router(mw *middleware.Middleware) http.Handler { 54 r := chi.NewRouter() 55 56 r.Use(middleware.AuthMiddleware(l.oauth)) 57 r.Put("/perform", l.PerformLabelOp) 58 59 return r 60} 61 62// this is a tricky handler implementation: 63// - the user selects the new state of all the labels in the label panel and hits save 64// - this handler should calculate the diff in order to create the labelop record 65// - we need the diff in order to maintain a "history" of operations performed by users 66func (l *Labels) PerformLabelOp(w http.ResponseWriter, r *http.Request) { 67 user := l.oauth.GetUser(r) 68 69 noticeId := "add-label-error" 70 71 fail := func(msg string, err error) { 72 l.logger.Error("failed to add label", "err", err) 73 l.pages.Notice(w, noticeId, msg) 74 } 75 76 if err := r.ParseForm(); err != nil { 77 fail("Invalid form.", err) 78 return 79 } 80 81 did := user.Did 82 rkey := tid.TID() 83 performedAt := time.Now() 84 indexedAt := time.Now() 85 repoAt := r.Form.Get("repo") 86 subjectUri := r.Form.Get("subject") 87 88 // find all the labels that this repo subscribes to 89 repoLabels, err := db.GetRepoLabels(l.db, db.FilterEq("repo_at", repoAt)) 90 if err != nil { 91 fail("Failed to get labels for this repository.", err) 92 return 93 } 94 95 var labelAts []string 96 for _, rl := range repoLabels { 97 labelAts = append(labelAts, rl.LabelAt.String()) 98 } 99 100 actx, err := db.NewLabelApplicationCtx(l.db, db.FilterIn("at_uri", labelAts)) 101 if err != nil { 102 fail("Invalid form data.", err) 103 return 104 } 105 106 l.logger.Info("actx", "labels", labelAts) 107 l.logger.Info("actx", "defs", actx.Defs) 108 109 // calculate the start state by applying already known labels 110 existingOps, err := db.GetLabelOps(l.db, db.FilterEq("subject", subjectUri)) 111 if err != nil { 112 fail("Invalid form data.", err) 113 return 114 } 115 116 labelState := db.NewLabelState() 117 actx.ApplyLabelOps(labelState, existingOps) 118 119 var labelOps []db.LabelOp 120 121 // first delete all existing state 122 for key, vals := range labelState.Inner() { 123 for val := range vals { 124 labelOps = append(labelOps, db.LabelOp{ 125 Did: did, 126 Rkey: rkey, 127 Subject: syntax.ATURI(subjectUri), 128 Operation: db.LabelOperationDel, 129 OperandKey: key, 130 OperandValue: val, 131 PerformedAt: performedAt, 132 IndexedAt: indexedAt, 133 }) 134 } 135 } 136 137 // add all the new state the user specified 138 for key, vals := range r.Form { 139 if _, ok := actx.Defs[key]; !ok { 140 continue 141 } 142 143 for _, val := range vals { 144 labelOps = append(labelOps, db.LabelOp{ 145 Did: did, 146 Rkey: rkey, 147 Subject: syntax.ATURI(subjectUri), 148 Operation: db.LabelOperationAdd, 149 OperandKey: key, 150 OperandValue: val, 151 PerformedAt: performedAt, 152 IndexedAt: indexedAt, 153 }) 154 } 155 } 156 157 // reduce the opset 158 labelOps = db.ReduceLabelOps(labelOps) 159 160 for i := range labelOps { 161 def := actx.Defs[labelOps[i].OperandKey] 162 if err := l.validator.ValidateLabelOp(def, &labelOps[i]); err != nil { 163 fail(fmt.Sprintf("Invalid form data: %s", err), err) 164 return 165 } 166 } 167 168 // next, apply all ops introduced in this request and filter out ones that are no-ops 169 validLabelOps := labelOps[:0] 170 for _, op := range labelOps { 171 if err = actx.ApplyLabelOp(labelState, op); err != db.LabelNoOpError { 172 validLabelOps = append(validLabelOps, op) 173 } 174 } 175 176 // nothing to do 177 if len(validLabelOps) == 0 { 178 l.pages.HxRefresh(w) 179 return 180 } 181 182 // create an atproto record of valid ops 183 record := db.LabelOpsAsRecord(validLabelOps) 184 185 client, err := l.oauth.AuthorizedClient(r) 186 if err != nil { 187 fail("Failed to authorize user.", err) 188 return 189 } 190 191 resp, err := client.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{ 192 Collection: tangled.LabelOpNSID, 193 Repo: did, 194 Rkey: rkey, 195 Record: &lexutil.LexiconTypeDecoder{ 196 Val: &record, 197 }, 198 }) 199 if err != nil { 200 fail("Failed to create record on PDS for user.", err) 201 return 202 } 203 atUri := resp.Uri 204 205 tx, err := l.db.BeginTx(r.Context(), nil) 206 if err != nil { 207 fail("Failed to update labels. Try again later.", err) 208 return 209 } 210 211 rollback := func() { 212 err1 := tx.Rollback() 213 err2 := rollbackRecord(context.Background(), atUri, client) 214 215 // ignore txn complete errors, this is okay 216 if errors.Is(err1, sql.ErrTxDone) { 217 err1 = nil 218 } 219 220 if errs := errors.Join(err1, err2); errs != nil { 221 return 222 } 223 } 224 defer rollback() 225 226 for _, o := range validLabelOps { 227 if _, err := db.AddLabelOp(l.db, &o); err != nil { 228 fail("Failed to update labels. Try again later.", err) 229 return 230 } 231 } 232 233 err = tx.Commit() 234 if err != nil { 235 return 236 } 237 238 // clear aturi when everything is successful 239 atUri = "" 240 241 l.pages.HxRefresh(w) 242} 243 244// this is used to rollback changes made to the PDS 245// 246// it is a no-op if the provided ATURI is empty 247func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error { 248 if aturi == "" { 249 return nil 250 } 251 252 parsed := syntax.ATURI(aturi) 253 254 collection := parsed.Collection().String() 255 repo := parsed.Authority().String() 256 rkey := parsed.RecordKey().String() 257 258 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{ 259 Collection: collection, 260 Repo: repo, 261 Rkey: rkey, 262 }) 263 return err 264}