1package labels
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "time"
11
12 comatproto "github.com/bluesky-social/indigo/api/atproto"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 lexutil "github.com/bluesky-social/indigo/lex/util"
15 "github.com/go-chi/chi/v5"
16
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/db"
19 "tangled.org/core/appview/middleware"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/oauth"
22 "tangled.org/core/appview/pages"
23 "tangled.org/core/appview/validator"
24 "tangled.org/core/appview/xrpcclient"
25 "tangled.org/core/log"
26 "tangled.org/core/rbac"
27 "tangled.org/core/tid"
28)
29
30type Labels struct {
31 oauth *oauth.OAuth
32 pages *pages.Pages
33 db *db.DB
34 logger *slog.Logger
35 validator *validator.Validator
36 enforcer *rbac.Enforcer
37}
38
39func New(
40 oauth *oauth.OAuth,
41 pages *pages.Pages,
42 db *db.DB,
43 validator *validator.Validator,
44 enforcer *rbac.Enforcer,
45) *Labels {
46 logger := log.New("labels")
47
48 return &Labels{
49 oauth: oauth,
50 pages: pages,
51 db: db,
52 logger: logger,
53 validator: validator,
54 enforcer: enforcer,
55 }
56}
57
58func (l *Labels) Router(mw *middleware.Middleware) http.Handler {
59 r := chi.NewRouter()
60
61 r.Use(middleware.AuthMiddleware(l.oauth))
62 r.Put("/perform", l.PerformLabelOp)
63
64 return r
65}
66
67// this is a tricky handler implementation:
68// - the user selects the new state of all the labels in the label panel and hits save
69// - this handler should calculate the diff in order to create the labelop record
70// - we need the diff in order to maintain a "history" of operations performed by users
71func (l *Labels) PerformLabelOp(w http.ResponseWriter, r *http.Request) {
72 user := l.oauth.GetUser(r)
73
74 noticeId := "add-label-error"
75
76 fail := func(msg string, err error) {
77 l.logger.Error("failed to add label", "err", err)
78 l.pages.Notice(w, noticeId, msg)
79 }
80
81 if err := r.ParseForm(); err != nil {
82 fail("Invalid form.", err)
83 return
84 }
85
86 did := user.Did
87 rkey := tid.TID()
88 performedAt := time.Now()
89 indexedAt := time.Now()
90 repoAt := r.Form.Get("repo")
91 subjectUri := r.Form.Get("subject")
92
93 repo, err := db.GetRepo(l.db, db.FilterEq("at_uri", repoAt))
94 if err != nil {
95 fail("Failed to get repository.", err)
96 return
97 }
98
99 // find all the labels that this repo subscribes to
100 repoLabels, err := db.GetRepoLabels(l.db, db.FilterEq("repo_at", repoAt))
101 if err != nil {
102 fail("Failed to get labels for this repository.", err)
103 return
104 }
105
106 var labelAts []string
107 for _, rl := range repoLabels {
108 labelAts = append(labelAts, rl.LabelAt.String())
109 }
110
111 actx, err := db.NewLabelApplicationCtx(l.db, db.FilterIn("at_uri", labelAts))
112 if err != nil {
113 fail("Invalid form data.", err)
114 return
115 }
116
117 // calculate the start state by applying already known labels
118 existingOps, err := db.GetLabelOps(l.db, db.FilterEq("subject", subjectUri))
119 if err != nil {
120 fail("Invalid form data.", err)
121 return
122 }
123
124 labelState := models.NewLabelState()
125 actx.ApplyLabelOps(labelState, existingOps)
126
127 var labelOps []models.LabelOp
128
129 // first delete all existing state
130 for key, vals := range labelState.Inner() {
131 for val := range vals {
132 labelOps = append(labelOps, models.LabelOp{
133 Did: did,
134 Rkey: rkey,
135 Subject: syntax.ATURI(subjectUri),
136 Operation: models.LabelOperationDel,
137 OperandKey: key,
138 OperandValue: val,
139 PerformedAt: performedAt,
140 IndexedAt: indexedAt,
141 })
142 }
143 }
144
145 // add all the new state the user specified
146 for key, vals := range r.Form {
147 if _, ok := actx.Defs[key]; !ok {
148 continue
149 }
150
151 for _, val := range vals {
152 labelOps = append(labelOps, models.LabelOp{
153 Did: did,
154 Rkey: rkey,
155 Subject: syntax.ATURI(subjectUri),
156 Operation: models.LabelOperationAdd,
157 OperandKey: key,
158 OperandValue: val,
159 PerformedAt: performedAt,
160 IndexedAt: indexedAt,
161 })
162 }
163 }
164
165 for i := range labelOps {
166 def := actx.Defs[labelOps[i].OperandKey]
167 if err := l.validator.ValidateLabelOp(def, repo, &labelOps[i]); err != nil {
168 fail(fmt.Sprintf("Invalid form data: %s", err), err)
169 return
170 }
171 }
172
173 // reduce the opset
174 labelOps = models.ReduceLabelOps(labelOps)
175
176 // next, apply all ops introduced in this request and filter out ones that are no-ops
177 validLabelOps := labelOps[:0]
178 for _, op := range labelOps {
179 if err = actx.ApplyLabelOp(labelState, op); err != models.LabelNoOpError {
180 validLabelOps = append(validLabelOps, op)
181 }
182 }
183
184 // nothing to do
185 if len(validLabelOps) == 0 {
186 l.pages.HxRefresh(w)
187 return
188 }
189
190 // create an atproto record of valid ops
191 record := models.LabelOpsAsRecord(validLabelOps)
192
193 client, err := l.oauth.AuthorizedClient(r)
194 if err != nil {
195 fail("Failed to authorize user.", err)
196 return
197 }
198
199 resp, err := client.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{
200 Collection: tangled.LabelOpNSID,
201 Repo: did,
202 Rkey: rkey,
203 Record: &lexutil.LexiconTypeDecoder{
204 Val: &record,
205 },
206 })
207 if err != nil {
208 fail("Failed to create record on PDS for user.", err)
209 return
210 }
211 atUri := resp.Uri
212
213 tx, err := l.db.BeginTx(r.Context(), nil)
214 if err != nil {
215 fail("Failed to update labels. Try again later.", err)
216 return
217 }
218
219 rollback := func() {
220 err1 := tx.Rollback()
221 err2 := rollbackRecord(context.Background(), atUri, client)
222
223 // ignore txn complete errors, this is okay
224 if errors.Is(err1, sql.ErrTxDone) {
225 err1 = nil
226 }
227
228 if errs := errors.Join(err1, err2); errs != nil {
229 return
230 }
231 }
232 defer rollback()
233
234 for _, o := range validLabelOps {
235 if _, err := db.AddLabelOp(l.db, &o); err != nil {
236 fail("Failed to update labels. Try again later.", err)
237 return
238 }
239 }
240
241 err = tx.Commit()
242 if err != nil {
243 return
244 }
245
246 // clear aturi when everything is successful
247 atUri = ""
248
249 l.pages.HxRefresh(w)
250}
251
252// this is used to rollback changes made to the PDS
253//
254// it is a no-op if the provided ATURI is empty
255func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error {
256 if aturi == "" {
257 return nil
258 }
259
260 parsed := syntax.ATURI(aturi)
261
262 collection := parsed.Collection().String()
263 repo := parsed.Authority().String()
264 rkey := parsed.RecordKey().String()
265
266 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{
267 Collection: collection,
268 Repo: repo,
269 Rkey: rkey,
270 })
271 return err
272}