forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package labels
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "time"
11
12 "tangled.org/core/api/tangled"
13 "tangled.org/core/appview/db"
14 "tangled.org/core/appview/middleware"
15 "tangled.org/core/appview/models"
16 "tangled.org/core/appview/oauth"
17 "tangled.org/core/appview/pages"
18 "tangled.org/core/appview/validator"
19 "tangled.org/core/orm"
20 "tangled.org/core/rbac"
21 "tangled.org/core/tid"
22
23 comatproto "github.com/bluesky-social/indigo/api/atproto"
24 atpclient "github.com/bluesky-social/indigo/atproto/client"
25 "github.com/bluesky-social/indigo/atproto/syntax"
26 lexutil "github.com/bluesky-social/indigo/lex/util"
27 "github.com/go-chi/chi/v5"
28)
29
30type Labels struct {
31 oauth *oauth.OAuth
32 pages *pages.Pages
33 db *db.DB
34 logger *slog.Logger
35 validator *validator.Validator
36 enforcer *rbac.Enforcer
37}
38
39func New(
40 oauth *oauth.OAuth,
41 pages *pages.Pages,
42 db *db.DB,
43 validator *validator.Validator,
44 enforcer *rbac.Enforcer,
45 logger *slog.Logger,
46) *Labels {
47 return &Labels{
48 oauth: oauth,
49 pages: pages,
50 db: db,
51 logger: logger,
52 validator: validator,
53 enforcer: enforcer,
54 }
55}
56
57func (l *Labels) Router() http.Handler {
58 r := chi.NewRouter()
59
60 r.Use(middleware.AuthMiddleware(l.oauth))
61 r.Put("/perform", l.PerformLabelOp)
62
63 return r
64}
65
66// this is a tricky handler implementation:
67// - the user selects the new state of all the labels in the label panel and hits save
68// - this handler should calculate the diff in order to create the labelop record
69// - we need the diff in order to maintain a "history" of operations performed by users
70func (l *Labels) PerformLabelOp(w http.ResponseWriter, r *http.Request) {
71 user := l.oauth.GetUser(r)
72
73 noticeId := "add-label-error"
74
75 fail := func(msg string, err error) {
76 l.logger.Error("failed to add label", "err", err)
77 l.pages.Notice(w, noticeId, msg)
78 }
79
80 if err := r.ParseForm(); err != nil {
81 fail("Invalid form.", err)
82 return
83 }
84
85 did := user.Did
86 rkey := tid.TID()
87 performedAt := time.Now()
88 indexedAt := time.Now()
89 repoAt := r.Form.Get("repo")
90 subjectUri := r.Form.Get("subject")
91
92 repo, err := db.GetRepo(l.db, orm.FilterEq("at_uri", repoAt))
93 if err != nil {
94 fail("Failed to get repository.", err)
95 return
96 }
97
98 // find all the labels that this repo subscribes to
99 repoLabels, err := db.GetRepoLabels(l.db, orm.FilterEq("repo_at", repoAt))
100 if err != nil {
101 fail("Failed to get labels for this repository.", err)
102 return
103 }
104
105 var labelAts []string
106 for _, rl := range repoLabels {
107 labelAts = append(labelAts, rl.LabelAt.String())
108 }
109
110 actx, err := db.NewLabelApplicationCtx(l.db, orm.FilterIn("at_uri", labelAts))
111 if err != nil {
112 fail("Invalid form data.", err)
113 return
114 }
115
116 // calculate the start state by applying already known labels
117 existingOps, err := db.GetLabelOps(l.db, orm.FilterEq("subject", subjectUri))
118 if err != nil {
119 fail("Invalid form data.", err)
120 return
121 }
122
123 labelState := models.NewLabelState()
124 actx.ApplyLabelOps(labelState, existingOps)
125
126 var labelOps []models.LabelOp
127
128 // first delete all existing state
129 for key, vals := range labelState.Inner() {
130 for val := range vals {
131 labelOps = append(labelOps, models.LabelOp{
132 Did: did,
133 Rkey: rkey,
134 Subject: syntax.ATURI(subjectUri),
135 Operation: models.LabelOperationDel,
136 OperandKey: key,
137 OperandValue: val,
138 PerformedAt: performedAt,
139 IndexedAt: indexedAt,
140 })
141 }
142 }
143
144 // add all the new state the user specified
145 for key, vals := range r.Form {
146 if _, ok := actx.Defs[key]; !ok {
147 continue
148 }
149
150 for _, val := range vals {
151 labelOps = append(labelOps, models.LabelOp{
152 Did: did,
153 Rkey: rkey,
154 Subject: syntax.ATURI(subjectUri),
155 Operation: models.LabelOperationAdd,
156 OperandKey: key,
157 OperandValue: val,
158 PerformedAt: performedAt,
159 IndexedAt: indexedAt,
160 })
161 }
162 }
163
164 for i := range labelOps {
165 def := actx.Defs[labelOps[i].OperandKey]
166 if err := l.validator.ValidateLabelOp(def, repo, &labelOps[i]); err != nil {
167 fail(fmt.Sprintf("Invalid form data: %s", err), err)
168 return
169 }
170 }
171
172 // reduce the opset
173 labelOps = models.ReduceLabelOps(labelOps)
174
175 // next, apply all ops introduced in this request and filter out ones that are no-ops
176 validLabelOps := labelOps[:0]
177 for _, op := range labelOps {
178 if err = actx.ApplyLabelOp(labelState, op); err != models.LabelNoOpError {
179 validLabelOps = append(validLabelOps, op)
180 }
181 }
182
183 // nothing to do
184 if len(validLabelOps) == 0 {
185 l.pages.HxRefresh(w)
186 return
187 }
188
189 // create an atproto record of valid ops
190 record := models.LabelOpsAsRecord(validLabelOps)
191
192 client, err := l.oauth.AuthorizedClient(r)
193 if err != nil {
194 fail("Failed to authorize user.", err)
195 return
196 }
197
198 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
199 Collection: tangled.LabelOpNSID,
200 Repo: did,
201 Rkey: rkey,
202 Record: &lexutil.LexiconTypeDecoder{
203 Val: &record,
204 },
205 })
206 if err != nil {
207 fail("Failed to create record on PDS for user.", err)
208 return
209 }
210 atUri := resp.Uri
211
212 tx, err := l.db.BeginTx(r.Context(), nil)
213 if err != nil {
214 fail("Failed to update labels. Try again later.", err)
215 return
216 }
217
218 rollback := func() {
219 err1 := tx.Rollback()
220 err2 := rollbackRecord(context.Background(), atUri, client)
221
222 // ignore txn complete errors, this is okay
223 if errors.Is(err1, sql.ErrTxDone) {
224 err1 = nil
225 }
226
227 if errs := errors.Join(err1, err2); errs != nil {
228 return
229 }
230 }
231 defer rollback()
232
233 for _, o := range validLabelOps {
234 if _, err := db.AddLabelOp(l.db, &o); err != nil {
235 fail("Failed to update labels. Try again later.", err)
236 return
237 }
238 }
239
240 err = tx.Commit()
241 if err != nil {
242 return
243 }
244
245 // clear aturi when everything is successful
246 atUri = ""
247
248 l.pages.HxRefresh(w)
249}
250
251// this is used to rollback changes made to the PDS
252//
253// it is a no-op if the provided ATURI is empty
254func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
255 if aturi == "" {
256 return nil
257 }
258
259 parsed := syntax.ATURI(aturi)
260
261 collection := parsed.Collection().String()
262 repo := parsed.Authority().String()
263 rkey := parsed.RecordKey().String()
264
265 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
266 Collection: collection,
267 Repo: repo,
268 Rkey: rkey,
269 })
270 return err
271}