forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log"
9 "log/slog"
10 "net/http"
11 "strings"
12 "time"
13
14 comatproto "github.com/bluesky-social/indigo/api/atproto"
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 lexutil "github.com/bluesky-social/indigo/lex/util"
17 securejoin "github.com/cyphar/filepath-securejoin"
18 "github.com/go-chi/chi/v5"
19 "github.com/posthog/posthog-go"
20 "tangled.org/core/api/tangled"
21 "tangled.org/core/appview"
22 "tangled.org/core/appview/cache"
23 "tangled.org/core/appview/cache/session"
24 "tangled.org/core/appview/config"
25 "tangled.org/core/appview/db"
26 "tangled.org/core/appview/models"
27 "tangled.org/core/appview/notify"
28 "tangled.org/core/appview/oauth"
29 "tangled.org/core/appview/pages"
30 posthogService "tangled.org/core/appview/posthog"
31 "tangled.org/core/appview/reporesolver"
32 "tangled.org/core/appview/validator"
33 xrpcclient "tangled.org/core/appview/xrpcclient"
34 "tangled.org/core/eventconsumer"
35 "tangled.org/core/idresolver"
36 "tangled.org/core/jetstream"
37 tlog "tangled.org/core/log"
38 "tangled.org/core/rbac"
39 "tangled.org/core/tid"
40)
41
42type State struct {
43 db *db.DB
44 notifier notify.Notifier
45 oauth *oauth.OAuth
46 enforcer *rbac.Enforcer
47 pages *pages.Pages
48 sess *session.SessionStore
49 idResolver *idresolver.Resolver
50 posthog posthog.Client
51 jc *jetstream.JetstreamClient
52 config *config.Config
53 repoResolver *reporesolver.RepoResolver
54 knotstream *eventconsumer.Consumer
55 spindlestream *eventconsumer.Consumer
56 logger *slog.Logger
57 validator *validator.Validator
58}
59
60func Make(ctx context.Context, config *config.Config) (*State, error) {
61 d, err := db.Make(config.Core.DbPath)
62 if err != nil {
63 return nil, fmt.Errorf("failed to create db: %w", err)
64 }
65
66 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
67 if err != nil {
68 return nil, fmt.Errorf("failed to create enforcer: %w", err)
69 }
70
71 res, err := idresolver.RedisResolver(config.Redis.ToURL())
72 if err != nil {
73 log.Printf("failed to create redis resolver: %v", err)
74 res = idresolver.DefaultResolver()
75 }
76
77 pgs := pages.NewPages(config, res)
78 cache := cache.New(config.Redis.Addr)
79 sess := session.New(cache)
80 oauth := oauth.NewOAuth(config, sess)
81 validator := validator.New(d, res)
82
83 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
84 if err != nil {
85 return nil, fmt.Errorf("failed to create posthog client: %w", err)
86 }
87
88 repoResolver := reporesolver.New(config, enforcer, res, d)
89
90 wrapper := db.DbWrapper{d}
91 jc, err := jetstream.NewJetstreamClient(
92 config.Jetstream.Endpoint,
93 "appview",
94 []string{
95 tangled.GraphFollowNSID,
96 tangled.FeedStarNSID,
97 tangled.PublicKeyNSID,
98 tangled.RepoArtifactNSID,
99 tangled.ActorProfileNSID,
100 tangled.SpindleMemberNSID,
101 tangled.SpindleNSID,
102 tangled.StringNSID,
103 tangled.RepoIssueNSID,
104 tangled.RepoIssueCommentNSID,
105 tangled.LabelDefinitionNSID,
106 },
107 nil,
108 slog.Default(),
109 wrapper,
110 false,
111
112 // in-memory filter is inapplicalble to appview so
113 // we'll never log dids anyway.
114 false,
115 )
116 if err != nil {
117 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
118 }
119
120 ingester := appview.Ingester{
121 Db: wrapper,
122 Enforcer: enforcer,
123 IdResolver: res,
124 Config: config,
125 Logger: tlog.New("ingester"),
126 Validator: validator,
127 }
128 err = jc.StartJetstream(ctx, ingester.Ingest())
129 if err != nil {
130 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
131 }
132
133 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
134 if err != nil {
135 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
136 }
137 knotstream.Start(ctx)
138
139 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
140 if err != nil {
141 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
142 }
143 spindlestream.Start(ctx)
144
145 var notifiers []notify.Notifier
146 if !config.Core.Dev {
147 notifiers = append(notifiers, posthogService.NewPosthogNotifier(posthog))
148 }
149 notifier := notify.NewMergedNotifier(notifiers...)
150
151 state := &State{
152 d,
153 notifier,
154 oauth,
155 enforcer,
156 pgs,
157 sess,
158 res,
159 posthog,
160 jc,
161 config,
162 repoResolver,
163 knotstream,
164 spindlestream,
165 slog.Default(),
166 validator,
167 }
168
169 return state, nil
170}
171
172func (s *State) Close() error {
173 // other close up logic goes here
174 return s.db.Close()
175}
176
177func (s *State) Favicon(w http.ResponseWriter, r *http.Request) {
178 w.Header().Set("Content-Type", "image/svg+xml")
179 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year
180 w.Header().Set("ETag", `"favicon-svg-v1"`)
181
182 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` {
183 w.WriteHeader(http.StatusNotModified)
184 return
185 }
186
187 s.pages.Favicon(w)
188}
189
190func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
191 user := s.oauth.GetUser(r)
192 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
193 LoggedInUser: user,
194 })
195}
196
197func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
198 user := s.oauth.GetUser(r)
199 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
200 LoggedInUser: user,
201 })
202}
203
204func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) {
205 if s.oauth.GetUser(r) != nil {
206 s.Timeline(w, r)
207 return
208 }
209 s.Home(w, r)
210}
211
212func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
213 user := s.oauth.GetUser(r)
214
215 var userDid string
216 if user != nil {
217 userDid = user.Did
218 }
219 timeline, err := db.MakeTimeline(s.db, 50, userDid)
220 if err != nil {
221 log.Println(err)
222 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
223 }
224
225 repos, err := db.GetTopStarredReposLastWeek(s.db)
226 if err != nil {
227 log.Println(err)
228 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
229 return
230 }
231
232 s.pages.Timeline(w, pages.TimelineParams{
233 LoggedInUser: user,
234 Timeline: timeline,
235 Repos: repos,
236 })
237}
238
239func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
240 user := s.oauth.GetUser(r)
241 l := s.logger.With("handler", "UpgradeBanner")
242 l = l.With("did", user.Did)
243 l = l.With("handle", user.Handle)
244
245 regs, err := db.GetRegistrations(
246 s.db,
247 db.FilterEq("did", user.Did),
248 db.FilterEq("needs_upgrade", 1),
249 )
250 if err != nil {
251 l.Error("non-fatal: failed to get registrations", "err", err)
252 }
253
254 spindles, err := db.GetSpindles(
255 s.db,
256 db.FilterEq("owner", user.Did),
257 db.FilterEq("needs_upgrade", 1),
258 )
259 if err != nil {
260 l.Error("non-fatal: failed to get spindles", "err", err)
261 }
262
263 if regs == nil && spindles == nil {
264 return
265 }
266
267 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
268 Registrations: regs,
269 Spindles: spindles,
270 })
271}
272
273func (s *State) Home(w http.ResponseWriter, r *http.Request) {
274 timeline, err := db.MakeTimeline(s.db, 5, "")
275 if err != nil {
276 log.Println(err)
277 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
278 return
279 }
280
281 repos, err := db.GetTopStarredReposLastWeek(s.db)
282 if err != nil {
283 log.Println(err)
284 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
285 return
286 }
287
288 s.pages.Home(w, pages.TimelineParams{
289 LoggedInUser: nil,
290 Timeline: timeline,
291 Repos: repos,
292 })
293}
294
295func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
296 user := chi.URLParam(r, "user")
297 user = strings.TrimPrefix(user, "@")
298
299 if user == "" {
300 w.WriteHeader(http.StatusBadRequest)
301 return
302 }
303
304 id, err := s.idResolver.ResolveIdent(r.Context(), user)
305 if err != nil {
306 w.WriteHeader(http.StatusInternalServerError)
307 return
308 }
309
310 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
311 if err != nil {
312 w.WriteHeader(http.StatusNotFound)
313 return
314 }
315
316 if len(pubKeys) == 0 {
317 w.WriteHeader(http.StatusNotFound)
318 return
319 }
320
321 for _, k := range pubKeys {
322 key := strings.TrimRight(k.Key, "\n")
323 fmt.Fprintln(w, key)
324 }
325}
326
327func validateRepoName(name string) error {
328 // check for path traversal attempts
329 if name == "." || name == ".." ||
330 strings.Contains(name, "/") || strings.Contains(name, "\\") {
331 return fmt.Errorf("Repository name contains invalid path characters")
332 }
333
334 // check for sequences that could be used for traversal when normalized
335 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
336 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
337 return fmt.Errorf("Repository name contains invalid path sequence")
338 }
339
340 // then continue with character validation
341 for _, char := range name {
342 if !((char >= 'a' && char <= 'z') ||
343 (char >= 'A' && char <= 'Z') ||
344 (char >= '0' && char <= '9') ||
345 char == '-' || char == '_' || char == '.') {
346 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
347 }
348 }
349
350 // additional check to prevent multiple sequential dots
351 if strings.Contains(name, "..") {
352 return fmt.Errorf("Repository name cannot contain sequential dots")
353 }
354
355 // if all checks pass
356 return nil
357}
358
359func stripGitExt(name string) string {
360 return strings.TrimSuffix(name, ".git")
361}
362
363func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
364 switch r.Method {
365 case http.MethodGet:
366 user := s.oauth.GetUser(r)
367 knots, err := s.enforcer.GetKnotsForUser(user.Did)
368 if err != nil {
369 s.pages.Notice(w, "repo", "Invalid user account.")
370 return
371 }
372
373 s.pages.NewRepo(w, pages.NewRepoParams{
374 LoggedInUser: user,
375 Knots: knots,
376 })
377
378 case http.MethodPost:
379 l := s.logger.With("handler", "NewRepo")
380
381 user := s.oauth.GetUser(r)
382 l = l.With("did", user.Did)
383 l = l.With("handle", user.Handle)
384
385 // form validation
386 domain := r.FormValue("domain")
387 if domain == "" {
388 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
389 return
390 }
391 l = l.With("knot", domain)
392
393 repoName := r.FormValue("name")
394 if repoName == "" {
395 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
396 return
397 }
398
399 if err := validateRepoName(repoName); err != nil {
400 s.pages.Notice(w, "repo", err.Error())
401 return
402 }
403 repoName = stripGitExt(repoName)
404 l = l.With("repoName", repoName)
405
406 defaultBranch := r.FormValue("branch")
407 if defaultBranch == "" {
408 defaultBranch = "main"
409 }
410 l = l.With("defaultBranch", defaultBranch)
411
412 description := r.FormValue("description")
413
414 // ACL validation
415 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
416 if err != nil || !ok {
417 l.Info("unauthorized")
418 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
419 return
420 }
421
422 // Check for existing repos
423 existingRepo, err := db.GetRepo(
424 s.db,
425 db.FilterEq("did", user.Did),
426 db.FilterEq("name", repoName),
427 )
428 if err == nil && existingRepo != nil {
429 l.Info("repo exists")
430 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
431 return
432 }
433
434 // create atproto record for this repo
435 rkey := tid.TID()
436 repo := &models.Repo{
437 Did: user.Did,
438 Name: repoName,
439 Knot: domain,
440 Rkey: rkey,
441 Description: description,
442 Created: time.Now(),
443 }
444 record := repo.AsRecord()
445
446 xrpcClient, err := s.oauth.AuthorizedClient(r)
447 if err != nil {
448 l.Info("PDS write failed", "err", err)
449 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
450 return
451 }
452
453 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{
454 Collection: tangled.RepoNSID,
455 Repo: user.Did,
456 Rkey: rkey,
457 Record: &lexutil.LexiconTypeDecoder{
458 Val: &record,
459 },
460 })
461 if err != nil {
462 l.Info("PDS write failed", "err", err)
463 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
464 return
465 }
466
467 aturi := atresp.Uri
468 l = l.With("aturi", aturi)
469 l.Info("wrote to PDS")
470
471 tx, err := s.db.BeginTx(r.Context(), nil)
472 if err != nil {
473 l.Info("txn failed", "err", err)
474 s.pages.Notice(w, "repo", "Failed to save repository information.")
475 return
476 }
477
478 // The rollback function reverts a few things on failure:
479 // - the pending txn
480 // - the ACLs
481 // - the atproto record created
482 rollback := func() {
483 err1 := tx.Rollback()
484 err2 := s.enforcer.E.LoadPolicy()
485 err3 := rollbackRecord(context.Background(), aturi, xrpcClient)
486
487 // ignore txn complete errors, this is okay
488 if errors.Is(err1, sql.ErrTxDone) {
489 err1 = nil
490 }
491
492 if errs := errors.Join(err1, err2, err3); errs != nil {
493 l.Error("failed to rollback changes", "errs", errs)
494 return
495 }
496 }
497 defer rollback()
498
499 client, err := s.oauth.ServiceClient(
500 r,
501 oauth.WithService(domain),
502 oauth.WithLxm(tangled.RepoCreateNSID),
503 oauth.WithDev(s.config.Core.Dev),
504 )
505 if err != nil {
506 l.Error("service auth failed", "err", err)
507 s.pages.Notice(w, "repo", "Failed to reach PDS.")
508 return
509 }
510
511 xe := tangled.RepoCreate(
512 r.Context(),
513 client,
514 &tangled.RepoCreate_Input{
515 Rkey: rkey,
516 },
517 )
518 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
519 l.Error("xrpc error", "xe", xe)
520 s.pages.Notice(w, "repo", err.Error())
521 return
522 }
523
524 err = db.AddRepo(tx, repo)
525 if err != nil {
526 l.Error("db write failed", "err", err)
527 s.pages.Notice(w, "repo", "Failed to save repository information.")
528 return
529 }
530
531 // acls
532 p, _ := securejoin.SecureJoin(user.Did, repoName)
533 err = s.enforcer.AddRepo(user.Did, domain, p)
534 if err != nil {
535 l.Error("acl setup failed", "err", err)
536 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
537 return
538 }
539
540 err = tx.Commit()
541 if err != nil {
542 l.Error("txn commit failed", "err", err)
543 http.Error(w, err.Error(), http.StatusInternalServerError)
544 return
545 }
546
547 err = s.enforcer.E.SavePolicy()
548 if err != nil {
549 l.Error("acl save failed", "err", err)
550 http.Error(w, err.Error(), http.StatusInternalServerError)
551 return
552 }
553
554 // reset the ATURI because the transaction completed successfully
555 aturi = ""
556
557 s.notifier.NewRepo(r.Context(), repo)
558 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName))
559 }
560}
561
562// this is used to rollback changes made to the PDS
563//
564// it is a no-op if the provided ATURI is empty
565func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error {
566 if aturi == "" {
567 return nil
568 }
569
570 parsed := syntax.ATURI(aturi)
571
572 collection := parsed.Collection().String()
573 repo := parsed.Authority().String()
574 rkey := parsed.RecordKey().String()
575
576 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{
577 Collection: collection,
578 Repo: repo,
579 Rkey: rkey,
580 })
581 return err
582}