1package labels
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "time"
11
12 "tangled.org/core/api/tangled"
13 "tangled.org/core/appview/db"
14 "tangled.org/core/appview/middleware"
15 "tangled.org/core/appview/models"
16 "tangled.org/core/appview/oauth"
17 "tangled.org/core/appview/pages"
18 "tangled.org/core/appview/validator"
19 "tangled.org/core/rbac"
20 "tangled.org/core/tid"
21
22 comatproto "github.com/bluesky-social/indigo/api/atproto"
23 atpclient "github.com/bluesky-social/indigo/atproto/client"
24 "github.com/bluesky-social/indigo/atproto/syntax"
25 lexutil "github.com/bluesky-social/indigo/lex/util"
26 "github.com/go-chi/chi/v5"
27)
28
29type Labels struct {
30 oauth *oauth.OAuth
31 pages *pages.Pages
32 db *db.DB
33 logger *slog.Logger
34 validator *validator.Validator
35 enforcer *rbac.Enforcer
36}
37
38func New(
39 oauth *oauth.OAuth,
40 pages *pages.Pages,
41 db *db.DB,
42 validator *validator.Validator,
43 enforcer *rbac.Enforcer,
44 logger *slog.Logger,
45) *Labels {
46 return &Labels{
47 oauth: oauth,
48 pages: pages,
49 db: db,
50 logger: logger,
51 validator: validator,
52 enforcer: enforcer,
53 }
54}
55
56func (l *Labels) Router() http.Handler {
57 r := chi.NewRouter()
58
59 r.Use(middleware.AuthMiddleware(l.oauth))
60 r.Put("/perform", l.PerformLabelOp)
61
62 return r
63}
64
65// this is a tricky handler implementation:
66// - the user selects the new state of all the labels in the label panel and hits save
67// - this handler should calculate the diff in order to create the labelop record
68// - we need the diff in order to maintain a "history" of operations performed by users
69func (l *Labels) PerformLabelOp(w http.ResponseWriter, r *http.Request) {
70 user := l.oauth.GetUser(r)
71
72 noticeId := "add-label-error"
73
74 fail := func(msg string, err error) {
75 l.logger.Error("failed to add label", "err", err)
76 l.pages.Notice(w, noticeId, msg)
77 }
78
79 if err := r.ParseForm(); err != nil {
80 fail("Invalid form.", err)
81 return
82 }
83
84 did := user.Did
85 rkey := tid.TID()
86 performedAt := time.Now()
87 indexedAt := time.Now()
88 repoAt := r.Form.Get("repo")
89 subjectUri := r.Form.Get("subject")
90
91 repo, err := db.GetRepo(l.db, db.FilterEq("at_uri", repoAt))
92 if err != nil {
93 fail("Failed to get repository.", err)
94 return
95 }
96
97 // find all the labels that this repo subscribes to
98 repoLabels, err := db.GetRepoLabels(l.db, db.FilterEq("repo_at", repoAt))
99 if err != nil {
100 fail("Failed to get labels for this repository.", err)
101 return
102 }
103
104 var labelAts []string
105 for _, rl := range repoLabels {
106 labelAts = append(labelAts, rl.LabelAt.String())
107 }
108
109 actx, err := db.NewLabelApplicationCtx(l.db, db.FilterIn("at_uri", labelAts))
110 if err != nil {
111 fail("Invalid form data.", err)
112 return
113 }
114
115 // calculate the start state by applying already known labels
116 existingOps, err := db.GetLabelOps(l.db, db.FilterEq("subject", subjectUri))
117 if err != nil {
118 fail("Invalid form data.", err)
119 return
120 }
121
122 labelState := models.NewLabelState()
123 actx.ApplyLabelOps(labelState, existingOps)
124
125 var labelOps []models.LabelOp
126
127 // first delete all existing state
128 for key, vals := range labelState.Inner() {
129 for val := range vals {
130 labelOps = append(labelOps, models.LabelOp{
131 Did: did,
132 Rkey: rkey,
133 Subject: syntax.ATURI(subjectUri),
134 Operation: models.LabelOperationDel,
135 OperandKey: key,
136 OperandValue: val,
137 PerformedAt: performedAt,
138 IndexedAt: indexedAt,
139 })
140 }
141 }
142
143 // add all the new state the user specified
144 for key, vals := range r.Form {
145 if _, ok := actx.Defs[key]; !ok {
146 continue
147 }
148
149 for _, val := range vals {
150 labelOps = append(labelOps, models.LabelOp{
151 Did: did,
152 Rkey: rkey,
153 Subject: syntax.ATURI(subjectUri),
154 Operation: models.LabelOperationAdd,
155 OperandKey: key,
156 OperandValue: val,
157 PerformedAt: performedAt,
158 IndexedAt: indexedAt,
159 })
160 }
161 }
162
163 for i := range labelOps {
164 def := actx.Defs[labelOps[i].OperandKey]
165 if err := l.validator.ValidateLabelOp(def, repo, &labelOps[i]); err != nil {
166 fail(fmt.Sprintf("Invalid form data: %s", err), err)
167 return
168 }
169 }
170
171 // reduce the opset
172 labelOps = models.ReduceLabelOps(labelOps)
173
174 // next, apply all ops introduced in this request and filter out ones that are no-ops
175 validLabelOps := labelOps[:0]
176 for _, op := range labelOps {
177 if err = actx.ApplyLabelOp(labelState, op); err != models.LabelNoOpError {
178 validLabelOps = append(validLabelOps, op)
179 }
180 }
181
182 // nothing to do
183 if len(validLabelOps) == 0 {
184 l.pages.HxRefresh(w)
185 return
186 }
187
188 // create an atproto record of valid ops
189 record := models.LabelOpsAsRecord(validLabelOps)
190
191 client, err := l.oauth.AuthorizedClient(r)
192 if err != nil {
193 fail("Failed to authorize user.", err)
194 return
195 }
196
197 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
198 Collection: tangled.LabelOpNSID,
199 Repo: did,
200 Rkey: rkey,
201 Record: &lexutil.LexiconTypeDecoder{
202 Val: &record,
203 },
204 })
205 if err != nil {
206 fail("Failed to create record on PDS for user.", err)
207 return
208 }
209 atUri := resp.Uri
210
211 tx, err := l.db.BeginTx(r.Context(), nil)
212 if err != nil {
213 fail("Failed to update labels. Try again later.", err)
214 return
215 }
216
217 rollback := func() {
218 err1 := tx.Rollback()
219 err2 := rollbackRecord(context.Background(), atUri, client)
220
221 // ignore txn complete errors, this is okay
222 if errors.Is(err1, sql.ErrTxDone) {
223 err1 = nil
224 }
225
226 if errs := errors.Join(err1, err2); errs != nil {
227 return
228 }
229 }
230 defer rollback()
231
232 for _, o := range validLabelOps {
233 if _, err := db.AddLabelOp(l.db, &o); err != nil {
234 fail("Failed to update labels. Try again later.", err)
235 return
236 }
237 }
238
239 err = tx.Commit()
240 if err != nil {
241 return
242 }
243
244 // clear aturi when everything is successful
245 atUri = ""
246
247 l.pages.HxRefresh(w)
248}
249
250// this is used to rollback changes made to the PDS
251//
252// it is a no-op if the provided ATURI is empty
253func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
254 if aturi == "" {
255 return nil
256 }
257
258 parsed := syntax.ATURI(aturi)
259
260 collection := parsed.Collection().String()
261 repo := parsed.Authority().String()
262 rkey := parsed.RecordKey().String()
263
264 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
265 Collection: collection,
266 Repo: repo,
267 Rkey: rkey,
268 })
269 return err
270}