1package labels
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "time"
11
12 comatproto "github.com/bluesky-social/indigo/api/atproto"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 lexutil "github.com/bluesky-social/indigo/lex/util"
15 "github.com/go-chi/chi/v5"
16
17 "tangled.sh/tangled.sh/core/api/tangled"
18 "tangled.sh/tangled.sh/core/appview/db"
19 "tangled.sh/tangled.sh/core/appview/middleware"
20 "tangled.sh/tangled.sh/core/appview/models"
21 "tangled.sh/tangled.sh/core/appview/oauth"
22 "tangled.sh/tangled.sh/core/appview/pages"
23 "tangled.sh/tangled.sh/core/appview/validator"
24 "tangled.sh/tangled.sh/core/appview/xrpcclient"
25 "tangled.sh/tangled.sh/core/log"
26 "tangled.sh/tangled.sh/core/tid"
27)
28
29type Labels struct {
30 oauth *oauth.OAuth
31 pages *pages.Pages
32 db *db.DB
33 logger *slog.Logger
34 validator *validator.Validator
35}
36
37func New(
38 oauth *oauth.OAuth,
39 pages *pages.Pages,
40 db *db.DB,
41 validator *validator.Validator,
42) *Labels {
43 logger := log.New("labels")
44
45 return &Labels{
46 oauth: oauth,
47 pages: pages,
48 db: db,
49 logger: logger,
50 validator: validator,
51 }
52}
53
54func (l *Labels) Router(mw *middleware.Middleware) http.Handler {
55 r := chi.NewRouter()
56
57 r.Use(middleware.AuthMiddleware(l.oauth))
58 r.Put("/perform", l.PerformLabelOp)
59
60 return r
61}
62
63// this is a tricky handler implementation:
64// - the user selects the new state of all the labels in the label panel and hits save
65// - this handler should calculate the diff in order to create the labelop record
66// - we need the diff in order to maintain a "history" of operations performed by users
67func (l *Labels) PerformLabelOp(w http.ResponseWriter, r *http.Request) {
68 user := l.oauth.GetUser(r)
69
70 noticeId := "add-label-error"
71
72 fail := func(msg string, err error) {
73 l.logger.Error("failed to add label", "err", err)
74 l.pages.Notice(w, noticeId, msg)
75 }
76
77 if err := r.ParseForm(); err != nil {
78 fail("Invalid form.", err)
79 return
80 }
81
82 did := user.Did
83 rkey := tid.TID()
84 performedAt := time.Now()
85 indexedAt := time.Now()
86 repoAt := r.Form.Get("repo")
87 subjectUri := r.Form.Get("subject")
88
89 // find all the labels that this repo subscribes to
90 repoLabels, err := db.GetRepoLabels(l.db, db.FilterEq("repo_at", repoAt))
91 if err != nil {
92 fail("Failed to get labels for this repository.", err)
93 return
94 }
95
96 var labelAts []string
97 for _, rl := range repoLabels {
98 labelAts = append(labelAts, rl.LabelAt.String())
99 }
100
101 actx, err := db.NewLabelApplicationCtx(l.db, db.FilterIn("at_uri", labelAts))
102 if err != nil {
103 fail("Invalid form data.", err)
104 return
105 }
106
107 l.logger.Info("actx", "labels", labelAts)
108 l.logger.Info("actx", "defs", actx.Defs)
109
110 // calculate the start state by applying already known labels
111 existingOps, err := db.GetLabelOps(l.db, db.FilterEq("subject", subjectUri))
112 if err != nil {
113 fail("Invalid form data.", err)
114 return
115 }
116
117 labelState := models.NewLabelState()
118 actx.ApplyLabelOps(labelState, existingOps)
119
120 var labelOps []models.LabelOp
121
122 // first delete all existing state
123 for key, vals := range labelState.Inner() {
124 for val := range vals {
125 labelOps = append(labelOps, models.LabelOp{
126 Did: did,
127 Rkey: rkey,
128 Subject: syntax.ATURI(subjectUri),
129 Operation: models.LabelOperationDel,
130 OperandKey: key,
131 OperandValue: val,
132 PerformedAt: performedAt,
133 IndexedAt: indexedAt,
134 })
135 }
136 }
137
138 // add all the new state the user specified
139 for key, vals := range r.Form {
140 if _, ok := actx.Defs[key]; !ok {
141 continue
142 }
143
144 for _, val := range vals {
145 labelOps = append(labelOps, models.LabelOp{
146 Did: did,
147 Rkey: rkey,
148 Subject: syntax.ATURI(subjectUri),
149 Operation: models.LabelOperationAdd,
150 OperandKey: key,
151 OperandValue: val,
152 PerformedAt: performedAt,
153 IndexedAt: indexedAt,
154 })
155 }
156 }
157
158 // reduce the opset
159 labelOps = models.ReduceLabelOps(labelOps)
160
161 for i := range labelOps {
162 def := actx.Defs[labelOps[i].OperandKey]
163 if err := l.validator.ValidateLabelOp(def, &labelOps[i]); err != nil {
164 fail(fmt.Sprintf("Invalid form data: %s", err), err)
165 return
166 }
167 }
168
169 // next, apply all ops introduced in this request and filter out ones that are no-ops
170 validLabelOps := labelOps[:0]
171 for _, op := range labelOps {
172 if err = actx.ApplyLabelOp(labelState, op); err != models.LabelNoOpError {
173 validLabelOps = append(validLabelOps, op)
174 }
175 }
176
177 // nothing to do
178 if len(validLabelOps) == 0 {
179 l.pages.HxRefresh(w)
180 return
181 }
182
183 // create an atproto record of valid ops
184 record := models.LabelOpsAsRecord(validLabelOps)
185
186 client, err := l.oauth.AuthorizedClient(r)
187 if err != nil {
188 fail("Failed to authorize user.", err)
189 return
190 }
191
192 resp, err := client.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{
193 Collection: tangled.LabelOpNSID,
194 Repo: did,
195 Rkey: rkey,
196 Record: &lexutil.LexiconTypeDecoder{
197 Val: &record,
198 },
199 })
200 if err != nil {
201 fail("Failed to create record on PDS for user.", err)
202 return
203 }
204 atUri := resp.Uri
205
206 tx, err := l.db.BeginTx(r.Context(), nil)
207 if err != nil {
208 fail("Failed to update labels. Try again later.", err)
209 return
210 }
211
212 rollback := func() {
213 err1 := tx.Rollback()
214 err2 := rollbackRecord(context.Background(), atUri, client)
215
216 // ignore txn complete errors, this is okay
217 if errors.Is(err1, sql.ErrTxDone) {
218 err1 = nil
219 }
220
221 if errs := errors.Join(err1, err2); errs != nil {
222 return
223 }
224 }
225 defer rollback()
226
227 for _, o := range validLabelOps {
228 if _, err := db.AddLabelOp(l.db, &o); err != nil {
229 fail("Failed to update labels. Try again later.", err)
230 return
231 }
232 }
233
234 err = tx.Commit()
235 if err != nil {
236 return
237 }
238
239 // clear aturi when everything is successful
240 atUri = ""
241
242 l.pages.HxRefresh(w)
243}
244
245// this is used to rollback changes made to the PDS
246//
247// it is a no-op if the provided ATURI is empty
248func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error {
249 if aturi == "" {
250 return nil
251 }
252
253 parsed := syntax.ATURI(aturi)
254
255 collection := parsed.Collection().String()
256 repo := parsed.Authority().String()
257 rkey := parsed.RecordKey().String()
258
259 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{
260 Collection: collection,
261 Repo: repo,
262 Rkey: rkey,
263 })
264 return err
265}