forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/config"
16 "tangled.org/core/appview/db"
17 "tangled.org/core/appview/indexer"
18 "tangled.org/core/appview/mentions"
19 "tangled.org/core/appview/models"
20 "tangled.org/core/appview/notify"
21 dbnotify "tangled.org/core/appview/notify/db"
22 phnotify "tangled.org/core/appview/notify/posthog"
23 "tangled.org/core/appview/oauth"
24 "tangled.org/core/appview/pages"
25 "tangled.org/core/appview/reporesolver"
26 "tangled.org/core/appview/validator"
27 xrpcclient "tangled.org/core/appview/xrpcclient"
28 "tangled.org/core/eventconsumer"
29 "tangled.org/core/idresolver"
30 "tangled.org/core/jetstream"
31 "tangled.org/core/log"
32 tlog "tangled.org/core/log"
33 "tangled.org/core/orm"
34 "tangled.org/core/rbac"
35 "tangled.org/core/tid"
36
37 comatproto "github.com/bluesky-social/indigo/api/atproto"
38 atpclient "github.com/bluesky-social/indigo/atproto/client"
39 "github.com/bluesky-social/indigo/atproto/syntax"
40 lexutil "github.com/bluesky-social/indigo/lex/util"
41 securejoin "github.com/cyphar/filepath-securejoin"
42 "github.com/go-chi/chi/v5"
43 "github.com/posthog/posthog-go"
44)
45
46type State struct {
47 db *db.DB
48 notifier notify.Notifier
49 indexer *indexer.Indexer
50 oauth *oauth.OAuth
51 enforcer *rbac.Enforcer
52 pages *pages.Pages
53 idResolver *idresolver.Resolver
54 mentionsResolver *mentions.Resolver
55 posthog posthog.Client
56 jc *jetstream.JetstreamClient
57 config *config.Config
58 repoResolver *reporesolver.RepoResolver
59 knotstream *eventconsumer.Consumer
60 spindlestream *eventconsumer.Consumer
61 logger *slog.Logger
62 validator *validator.Validator
63}
64
65func Make(ctx context.Context, config *config.Config) (*State, error) {
66 logger := tlog.FromContext(ctx)
67
68 d, err := db.Make(ctx, config.Core.DbPath)
69 if err != nil {
70 return nil, fmt.Errorf("failed to create db: %w", err)
71 }
72
73 indexer := indexer.New(log.SubLogger(logger, "indexer"))
74 err = indexer.Init(ctx, d)
75 if err != nil {
76 return nil, fmt.Errorf("failed to create indexer: %w", err)
77 }
78
79 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
80 if err != nil {
81 return nil, fmt.Errorf("failed to create enforcer: %w", err)
82 }
83
84 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
85 if err != nil {
86 logger.Error("failed to create redis resolver", "err", err)
87 res = idresolver.DefaultResolver(config.Plc.PLCURL)
88 }
89
90 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
91 if err != nil {
92 return nil, fmt.Errorf("failed to create posthog client: %w", err)
93 }
94
95 pages := pages.NewPages(config, res, log.SubLogger(logger, "pages"))
96 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
97 if err != nil {
98 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
99 }
100 validator := validator.New(d, res, enforcer)
101
102 repoResolver := reporesolver.New(config, enforcer, d)
103
104 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
105
106 wrapper := db.DbWrapper{Execer: d}
107 jc, err := jetstream.NewJetstreamClient(
108 config.Jetstream.Endpoint,
109 "appview",
110 []string{
111 tangled.GraphFollowNSID,
112 tangled.FeedStarNSID,
113 tangled.PublicKeyNSID,
114 tangled.RepoArtifactNSID,
115 tangled.ActorProfileNSID,
116 tangled.SpindleMemberNSID,
117 tangled.SpindleNSID,
118 tangled.StringNSID,
119 tangled.RepoIssueNSID,
120 tangled.RepoIssueCommentNSID,
121 tangled.LabelDefinitionNSID,
122 tangled.LabelOpNSID,
123 },
124 nil,
125 tlog.SubLogger(logger, "jetstream"),
126 wrapper,
127 false,
128
129 // in-memory filter is inapplicalble to appview so
130 // we'll never log dids anyway.
131 false,
132 )
133 if err != nil {
134 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
135 }
136
137 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
138 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
139 }
140
141 ingester := appview.Ingester{
142 Db: wrapper,
143 Enforcer: enforcer,
144 IdResolver: res,
145 Config: config,
146 Logger: log.SubLogger(logger, "ingester"),
147 Validator: validator,
148 }
149 err = jc.StartJetstream(ctx, ingester.Ingest())
150 if err != nil {
151 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
152 }
153
154 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
155 if err != nil {
156 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
157 }
158 knotstream.Start(ctx)
159
160 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
161 if err != nil {
162 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
163 }
164 spindlestream.Start(ctx)
165
166 var notifiers []notify.Notifier
167
168 // Always add the database notifier
169 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
170
171 // Add other notifiers in production only
172 if !config.Core.Dev {
173 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
174 }
175 notifiers = append(notifiers, indexer)
176 notifier := notify.NewMergedNotifier(notifiers, tlog.SubLogger(logger, "notify"))
177
178 state := &State{
179 d,
180 notifier,
181 indexer,
182 oauth,
183 enforcer,
184 pages,
185 res,
186 mentionsResolver,
187 posthog,
188 jc,
189 config,
190 repoResolver,
191 knotstream,
192 spindlestream,
193 logger,
194 validator,
195 }
196
197 return state, nil
198}
199
200func (s *State) Close() error {
201 // other close up logic goes here
202 return s.db.Close()
203}
204
205func (s *State) Favicon(w http.ResponseWriter, r *http.Request) {
206 w.Header().Set("Content-Type", "image/svg+xml")
207 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year
208 w.Header().Set("ETag", `"favicon-svg-v1"`)
209
210 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` {
211 w.WriteHeader(http.StatusNotModified)
212 return
213 }
214
215 s.pages.Favicon(w)
216}
217
218func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
219 w.Header().Set("Content-Type", "text/plain")
220 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
221
222 robotsTxt := `User-agent: *
223Allow: /
224`
225 w.Write([]byte(robotsTxt))
226}
227
228// https://developer.mozilla.org/en-US/docs/Web/Progressive_web_apps/Manifest
229const manifestJson = `{
230 "name": "tangled",
231 "description": "tightly-knit social coding.",
232 "icons": [
233 {
234 "src": "/favicon.svg",
235 "sizes": "144x144"
236 }
237 ],
238 "start_url": "/",
239 "id": "org.tangled",
240
241 "display": "standalone",
242 "background_color": "#111827",
243 "theme_color": "#111827"
244}`
245
246func (p *State) PWAManifest(w http.ResponseWriter, r *http.Request) {
247 w.Header().Set("Content-Type", "application/json")
248 w.Write([]byte(manifestJson))
249}
250
251func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
252 user := s.oauth.GetUser(r)
253 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
254 LoggedInUser: user,
255 })
256}
257
258func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
259 user := s.oauth.GetUser(r)
260 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
261 LoggedInUser: user,
262 })
263}
264
265func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
266 user := s.oauth.GetUser(r)
267 s.pages.Brand(w, pages.BrandParams{
268 LoggedInUser: user,
269 })
270}
271
272func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) {
273 if s.oauth.GetUser(r) != nil {
274 s.Timeline(w, r)
275 return
276 }
277 s.Home(w, r)
278}
279
280func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
281 user := s.oauth.GetUser(r)
282
283 // TODO: set this flag based on the UI
284 filtered := false
285
286 var userDid string
287 if user != nil {
288 userDid = user.Did
289 }
290 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered)
291 if err != nil {
292 s.logger.Error("failed to make timeline", "err", err)
293 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
294 }
295
296 repos, err := db.GetTopStarredReposLastWeek(s.db)
297 if err != nil {
298 s.logger.Error("failed to get top starred repos", "err", err)
299 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
300 return
301 }
302
303 gfiLabel, err := db.GetLabelDefinition(s.db, orm.FilterEq("at_uri", s.config.Label.GoodFirstIssue))
304 if err != nil {
305 // non-fatal
306 }
307
308 s.pages.Timeline(w, pages.TimelineParams{
309 LoggedInUser: user,
310 Timeline: timeline,
311 Repos: repos,
312 GfiLabel: gfiLabel,
313 })
314}
315
316func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
317 user := s.oauth.GetUser(r)
318 if user == nil {
319 return
320 }
321
322 l := s.logger.With("handler", "UpgradeBanner")
323 l = l.With("did", user.Did)
324
325 regs, err := db.GetRegistrations(
326 s.db,
327 orm.FilterEq("did", user.Did),
328 orm.FilterEq("needs_upgrade", 1),
329 )
330 if err != nil {
331 l.Error("non-fatal: failed to get registrations", "err", err)
332 }
333
334 spindles, err := db.GetSpindles(
335 s.db,
336 orm.FilterEq("owner", user.Did),
337 orm.FilterEq("needs_upgrade", 1),
338 )
339 if err != nil {
340 l.Error("non-fatal: failed to get spindles", "err", err)
341 }
342
343 if regs == nil && spindles == nil {
344 return
345 }
346
347 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
348 Registrations: regs,
349 Spindles: spindles,
350 })
351}
352
353func (s *State) Home(w http.ResponseWriter, r *http.Request) {
354 // TODO: set this flag based on the UI
355 filtered := false
356
357 timeline, err := db.MakeTimeline(s.db, 5, "", filtered)
358 if err != nil {
359 s.logger.Error("failed to make timeline", "err", err)
360 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
361 return
362 }
363
364 repos, err := db.GetTopStarredReposLastWeek(s.db)
365 if err != nil {
366 s.logger.Error("failed to get top starred repos", "err", err)
367 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
368 return
369 }
370
371 s.pages.Home(w, pages.TimelineParams{
372 LoggedInUser: nil,
373 Timeline: timeline,
374 Repos: repos,
375 })
376}
377
378func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
379 user := chi.URLParam(r, "user")
380 user = strings.TrimPrefix(user, "@")
381
382 if user == "" {
383 w.WriteHeader(http.StatusBadRequest)
384 return
385 }
386
387 id, err := s.idResolver.ResolveIdent(r.Context(), user)
388 if err != nil {
389 w.WriteHeader(http.StatusInternalServerError)
390 return
391 }
392
393 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
394 if err != nil {
395 s.logger.Error("failed to get public keys", "err", err)
396 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
397 return
398 }
399
400 if len(pubKeys) == 0 {
401 w.WriteHeader(http.StatusNoContent)
402 return
403 }
404
405 for _, k := range pubKeys {
406 key := strings.TrimRight(k.Key, "\n")
407 fmt.Fprintln(w, key)
408 }
409}
410
411func validateRepoName(name string) error {
412 // check for path traversal attempts
413 if name == "." || name == ".." ||
414 strings.Contains(name, "/") || strings.Contains(name, "\\") {
415 return fmt.Errorf("Repository name contains invalid path characters")
416 }
417
418 // check for sequences that could be used for traversal when normalized
419 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
420 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
421 return fmt.Errorf("Repository name contains invalid path sequence")
422 }
423
424 // then continue with character validation
425 for _, char := range name {
426 if !((char >= 'a' && char <= 'z') ||
427 (char >= 'A' && char <= 'Z') ||
428 (char >= '0' && char <= '9') ||
429 char == '-' || char == '_' || char == '.') {
430 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
431 }
432 }
433
434 // additional check to prevent multiple sequential dots
435 if strings.Contains(name, "..") {
436 return fmt.Errorf("Repository name cannot contain sequential dots")
437 }
438
439 // if all checks pass
440 return nil
441}
442
443func stripGitExt(name string) string {
444 return strings.TrimSuffix(name, ".git")
445}
446
447func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
448 switch r.Method {
449 case http.MethodGet:
450 user := s.oauth.GetUser(r)
451 knots, err := s.enforcer.GetKnotsForUser(user.Did)
452 if err != nil {
453 s.pages.Notice(w, "repo", "Invalid user account.")
454 return
455 }
456
457 s.pages.NewRepo(w, pages.NewRepoParams{
458 LoggedInUser: user,
459 Knots: knots,
460 })
461
462 case http.MethodPost:
463 l := s.logger.With("handler", "NewRepo")
464
465 user := s.oauth.GetUser(r)
466 l = l.With("did", user.Did)
467
468 // form validation
469 domain := r.FormValue("domain")
470 if domain == "" {
471 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
472 return
473 }
474 l = l.With("knot", domain)
475
476 repoName := r.FormValue("name")
477 if repoName == "" {
478 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
479 return
480 }
481
482 if err := validateRepoName(repoName); err != nil {
483 s.pages.Notice(w, "repo", err.Error())
484 return
485 }
486 repoName = stripGitExt(repoName)
487 l = l.With("repoName", repoName)
488
489 defaultBranch := r.FormValue("branch")
490 if defaultBranch == "" {
491 defaultBranch = "main"
492 }
493 l = l.With("defaultBranch", defaultBranch)
494
495 description := r.FormValue("description")
496
497 // ACL validation
498 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
499 if err != nil || !ok {
500 l.Info("unauthorized")
501 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
502 return
503 }
504
505 // Check for existing repos
506 existingRepo, err := db.GetRepo(
507 s.db,
508 orm.FilterEq("did", user.Did),
509 orm.FilterEq("name", repoName),
510 )
511 if err == nil && existingRepo != nil {
512 l.Info("repo exists")
513 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
514 return
515 }
516
517 // create atproto record for this repo
518 rkey := tid.TID()
519 repo := &models.Repo{
520 Did: user.Did,
521 Name: repoName,
522 Knot: domain,
523 Rkey: rkey,
524 Description: description,
525 Created: time.Now(),
526 Labels: s.config.Label.DefaultLabelDefs,
527 }
528 record := repo.AsRecord()
529
530 atpClient, err := s.oauth.AuthorizedClient(r)
531 if err != nil {
532 l.Info("PDS write failed", "err", err)
533 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
534 return
535 }
536
537 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
538 Collection: tangled.RepoNSID,
539 Repo: user.Did,
540 Rkey: rkey,
541 Record: &lexutil.LexiconTypeDecoder{
542 Val: &record,
543 },
544 })
545 if err != nil {
546 l.Info("PDS write failed", "err", err)
547 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
548 return
549 }
550
551 aturi := atresp.Uri
552 l = l.With("aturi", aturi)
553 l.Info("wrote to PDS")
554
555 tx, err := s.db.BeginTx(r.Context(), nil)
556 if err != nil {
557 l.Info("txn failed", "err", err)
558 s.pages.Notice(w, "repo", "Failed to save repository information.")
559 return
560 }
561
562 // The rollback function reverts a few things on failure:
563 // - the pending txn
564 // - the ACLs
565 // - the atproto record created
566 rollback := func() {
567 err1 := tx.Rollback()
568 err2 := s.enforcer.E.LoadPolicy()
569 err3 := rollbackRecord(context.Background(), aturi, atpClient)
570
571 // ignore txn complete errors, this is okay
572 if errors.Is(err1, sql.ErrTxDone) {
573 err1 = nil
574 }
575
576 if errs := errors.Join(err1, err2, err3); errs != nil {
577 l.Error("failed to rollback changes", "errs", errs)
578 return
579 }
580 }
581 defer rollback()
582
583 client, err := s.oauth.ServiceClient(
584 r,
585 oauth.WithService(domain),
586 oauth.WithLxm(tangled.RepoCreateNSID),
587 oauth.WithDev(s.config.Core.Dev),
588 )
589 if err != nil {
590 l.Error("service auth failed", "err", err)
591 s.pages.Notice(w, "repo", "Failed to reach PDS.")
592 return
593 }
594
595 xe := tangled.RepoCreate(
596 r.Context(),
597 client,
598 &tangled.RepoCreate_Input{
599 Rkey: rkey,
600 },
601 )
602 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
603 l.Error("xrpc error", "xe", xe)
604 s.pages.Notice(w, "repo", err.Error())
605 return
606 }
607
608 err = db.AddRepo(tx, repo)
609 if err != nil {
610 l.Error("db write failed", "err", err)
611 s.pages.Notice(w, "repo", "Failed to save repository information.")
612 return
613 }
614
615 // acls
616 p, _ := securejoin.SecureJoin(user.Did, repoName)
617 err = s.enforcer.AddRepo(user.Did, domain, p)
618 if err != nil {
619 l.Error("acl setup failed", "err", err)
620 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
621 return
622 }
623
624 err = tx.Commit()
625 if err != nil {
626 l.Error("txn commit failed", "err", err)
627 http.Error(w, err.Error(), http.StatusInternalServerError)
628 return
629 }
630
631 err = s.enforcer.E.SavePolicy()
632 if err != nil {
633 l.Error("acl save failed", "err", err)
634 http.Error(w, err.Error(), http.StatusInternalServerError)
635 return
636 }
637
638 // reset the ATURI because the transaction completed successfully
639 aturi = ""
640
641 s.notifier.NewRepo(r.Context(), repo)
642 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Did, repoName))
643 }
644}
645
646// this is used to rollback changes made to the PDS
647//
648// it is a no-op if the provided ATURI is empty
649func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
650 if aturi == "" {
651 return nil
652 }
653
654 parsed := syntax.ATURI(aturi)
655
656 collection := parsed.Collection().String()
657 repo := parsed.Authority().String()
658 rkey := parsed.RecordKey().String()
659
660 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
661 Collection: collection,
662 Repo: repo,
663 Rkey: rkey,
664 })
665 return err
666}
667
668func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
669 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
670 if err != nil {
671 return err
672 }
673 // already present
674 if len(defaultLabels) == len(defaults) {
675 return nil
676 }
677
678 labelDefs, err := models.FetchLabelDefs(r, defaults)
679 if err != nil {
680 return err
681 }
682
683 // Insert each label definition to the database
684 for _, labelDef := range labelDefs {
685 _, err = db.AddLabelDefinition(e, &labelDef)
686 if err != nil {
687 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
688 }
689 }
690
691 return nil
692}