1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log"
9 "log/slog"
10 "net/http"
11 "strings"
12 "time"
13
14 "tangled.org/core/api/tangled"
15 "tangled.org/core/appview"
16 "tangled.org/core/appview/cache"
17 "tangled.org/core/appview/cache/session"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/notify"
22 dbnotify "tangled.org/core/appview/notify/db"
23 phnotify "tangled.org/core/appview/notify/posthog"
24 "tangled.org/core/appview/oauth"
25 "tangled.org/core/appview/pages"
26 "tangled.org/core/appview/reporesolver"
27 "tangled.org/core/appview/validator"
28 xrpcclient "tangled.org/core/appview/xrpcclient"
29 "tangled.org/core/eventconsumer"
30 "tangled.org/core/idresolver"
31 "tangled.org/core/jetstream"
32 tlog "tangled.org/core/log"
33 "tangled.org/core/rbac"
34 "tangled.org/core/tid"
35
36 comatproto "github.com/bluesky-social/indigo/api/atproto"
37 atpclient "github.com/bluesky-social/indigo/atproto/client"
38 "github.com/bluesky-social/indigo/atproto/syntax"
39 lexutil "github.com/bluesky-social/indigo/lex/util"
40 securejoin "github.com/cyphar/filepath-securejoin"
41 "github.com/go-chi/chi/v5"
42 "github.com/posthog/posthog-go"
43)
44
45type State struct {
46 db *db.DB
47 notifier notify.Notifier
48 oauth *oauth.OAuth
49 enforcer *rbac.Enforcer
50 pages *pages.Pages
51 sess *session.SessionStore
52 idResolver *idresolver.Resolver
53 posthog posthog.Client
54 jc *jetstream.JetstreamClient
55 config *config.Config
56 repoResolver *reporesolver.RepoResolver
57 knotstream *eventconsumer.Consumer
58 spindlestream *eventconsumer.Consumer
59 logger *slog.Logger
60 validator *validator.Validator
61}
62
63func Make(ctx context.Context, config *config.Config) (*State, error) {
64 d, err := db.Make(config.Core.DbPath)
65 if err != nil {
66 return nil, fmt.Errorf("failed to create db: %w", err)
67 }
68
69 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
70 if err != nil {
71 return nil, fmt.Errorf("failed to create enforcer: %w", err)
72 }
73
74 res, err := idresolver.RedisResolver(config.Redis.ToURL())
75 if err != nil {
76 log.Printf("failed to create redis resolver: %v", err)
77 res = idresolver.DefaultResolver()
78 }
79
80 pages := pages.NewPages(config, res)
81 cache := cache.New(config.Redis.Addr)
82 sess := session.New(cache)
83 oauth2, err := oauth.New(config)
84 if err != nil {
85 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
86 }
87 validator := validator.New(d, res, enforcer)
88
89 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
90 if err != nil {
91 return nil, fmt.Errorf("failed to create posthog client: %w", err)
92 }
93
94 repoResolver := reporesolver.New(config, enforcer, res, d)
95
96 wrapper := db.DbWrapper{Execer: d}
97 jc, err := jetstream.NewJetstreamClient(
98 config.Jetstream.Endpoint,
99 "appview",
100 []string{
101 tangled.GraphFollowNSID,
102 tangled.FeedStarNSID,
103 tangled.PublicKeyNSID,
104 tangled.RepoArtifactNSID,
105 tangled.ActorProfileNSID,
106 tangled.SpindleMemberNSID,
107 tangled.SpindleNSID,
108 tangled.StringNSID,
109 tangled.RepoIssueNSID,
110 tangled.RepoIssueCommentNSID,
111 tangled.LabelDefinitionNSID,
112 tangled.LabelOpNSID,
113 },
114 nil,
115 slog.Default(),
116 wrapper,
117 false,
118
119 // in-memory filter is inapplicalble to appview so
120 // we'll never log dids anyway.
121 false,
122 )
123 if err != nil {
124 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
125 }
126
127 if err := BackfillDefaultDefs(d, res); err != nil {
128 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
129 }
130
131 ingester := appview.Ingester{
132 Db: wrapper,
133 Enforcer: enforcer,
134 IdResolver: res,
135 Config: config,
136 Logger: tlog.New("ingester"),
137 Validator: validator,
138 }
139 err = jc.StartJetstream(ctx, ingester.Ingest())
140 if err != nil {
141 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
142 }
143
144 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
145 if err != nil {
146 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
147 }
148 knotstream.Start(ctx)
149
150 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
151 if err != nil {
152 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
153 }
154 spindlestream.Start(ctx)
155
156 var notifiers []notify.Notifier
157
158 // Always add the database notifier
159 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
160
161 // Add other notifiers in production only
162 if !config.Core.Dev {
163 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
164 }
165 notifier := notify.NewMergedNotifier(notifiers...)
166
167 state := &State{
168 d,
169 notifier,
170 oauth2,
171 enforcer,
172 pages,
173 sess,
174 res,
175 posthog,
176 jc,
177 config,
178 repoResolver,
179 knotstream,
180 spindlestream,
181 slog.Default(),
182 validator,
183 }
184
185 return state, nil
186}
187
188func (s *State) Close() error {
189 // other close up logic goes here
190 return s.db.Close()
191}
192
193func (s *State) Favicon(w http.ResponseWriter, r *http.Request) {
194 w.Header().Set("Content-Type", "image/svg+xml")
195 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year
196 w.Header().Set("ETag", `"favicon-svg-v1"`)
197
198 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` {
199 w.WriteHeader(http.StatusNotModified)
200 return
201 }
202
203 s.pages.Favicon(w)
204}
205
206func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
207 w.Header().Set("Content-Type", "text/plain")
208 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
209
210 robotsTxt := `User-agent: *
211Allow: /
212`
213 w.Write([]byte(robotsTxt))
214}
215
216// https://developer.mozilla.org/en-US/docs/Web/Progressive_web_apps/Manifest
217const manifestJson = `{
218 "name": "tangled",
219 "description": "tightly-knit social coding.",
220 "icons": [
221 {
222 "src": "/favicon.svg",
223 "sizes": "144x144"
224 }
225 ],
226 "start_url": "/",
227 "id": "org.tangled",
228
229 "display": "standalone",
230 "background_color": "#111827",
231 "theme_color": "#111827"
232}`
233
234func (p *State) PWAManifest(w http.ResponseWriter, r *http.Request) {
235 w.Header().Set("Content-Type", "application/json")
236 w.Write([]byte(manifestJson))
237}
238
239func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
240 user := s.oauth.GetUser(r)
241 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
242 LoggedInUser: user,
243 })
244}
245
246func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
247 user := s.oauth.GetUser(r)
248 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
249 LoggedInUser: user,
250 })
251}
252
253func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
254 user := s.oauth.GetUser(r)
255 s.pages.Brand(w, pages.BrandParams{
256 LoggedInUser: user,
257 })
258}
259
260func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) {
261 if s.oauth.GetUser(r) != nil {
262 s.Timeline(w, r)
263 return
264 }
265 s.Home(w, r)
266}
267
268func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
269 user := s.oauth.GetUser(r)
270
271 // TODO: set this flag based on the UI
272 filtered := false
273
274 var userDid string
275 if user != nil {
276 userDid = user.Did
277 }
278 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered)
279 if err != nil {
280 log.Println(err)
281 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
282 }
283
284 repos, err := db.GetTopStarredReposLastWeek(s.db)
285 if err != nil {
286 log.Println(err)
287 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
288 return
289 }
290
291 gfiLabel, err := db.GetLabelDefinition(s.db, db.FilterEq("at_uri", models.LabelGoodFirstIssue))
292 if err != nil {
293 // non-fatal
294 }
295
296 s.pages.Timeline(w, pages.TimelineParams{
297 LoggedInUser: user,
298 Timeline: timeline,
299 Repos: repos,
300 GfiLabel: gfiLabel,
301 })
302}
303
304func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
305 user := s.oauth.GetUser(r)
306 if user == nil {
307 return
308 }
309
310 l := s.logger.With("handler", "UpgradeBanner")
311 l = l.With("did", user.Did)
312
313 regs, err := db.GetRegistrations(
314 s.db,
315 db.FilterEq("did", user.Did),
316 db.FilterEq("needs_upgrade", 1),
317 )
318 if err != nil {
319 l.Error("non-fatal: failed to get registrations", "err", err)
320 }
321
322 spindles, err := db.GetSpindles(
323 s.db,
324 db.FilterEq("owner", user.Did),
325 db.FilterEq("needs_upgrade", 1),
326 )
327 if err != nil {
328 l.Error("non-fatal: failed to get spindles", "err", err)
329 }
330
331 if regs == nil && spindles == nil {
332 return
333 }
334
335 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
336 Registrations: regs,
337 Spindles: spindles,
338 })
339}
340
341func (s *State) Home(w http.ResponseWriter, r *http.Request) {
342 // TODO: set this flag based on the UI
343 filtered := false
344
345 timeline, err := db.MakeTimeline(s.db, 5, "", filtered)
346 if err != nil {
347 log.Println(err)
348 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
349 return
350 }
351
352 repos, err := db.GetTopStarredReposLastWeek(s.db)
353 if err != nil {
354 log.Println(err)
355 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
356 return
357 }
358
359 s.pages.Home(w, pages.TimelineParams{
360 LoggedInUser: nil,
361 Timeline: timeline,
362 Repos: repos,
363 })
364}
365
366func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
367 user := chi.URLParam(r, "user")
368 user = strings.TrimPrefix(user, "@")
369
370 if user == "" {
371 w.WriteHeader(http.StatusBadRequest)
372 return
373 }
374
375 id, err := s.idResolver.ResolveIdent(r.Context(), user)
376 if err != nil {
377 w.WriteHeader(http.StatusInternalServerError)
378 return
379 }
380
381 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
382 if err != nil {
383 w.WriteHeader(http.StatusNotFound)
384 return
385 }
386
387 if len(pubKeys) == 0 {
388 w.WriteHeader(http.StatusNotFound)
389 return
390 }
391
392 for _, k := range pubKeys {
393 key := strings.TrimRight(k.Key, "\n")
394 fmt.Fprintln(w, key)
395 }
396}
397
398func validateRepoName(name string) error {
399 // check for path traversal attempts
400 if name == "." || name == ".." ||
401 strings.Contains(name, "/") || strings.Contains(name, "\\") {
402 return fmt.Errorf("Repository name contains invalid path characters")
403 }
404
405 // check for sequences that could be used for traversal when normalized
406 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
407 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
408 return fmt.Errorf("Repository name contains invalid path sequence")
409 }
410
411 // then continue with character validation
412 for _, char := range name {
413 if !((char >= 'a' && char <= 'z') ||
414 (char >= 'A' && char <= 'Z') ||
415 (char >= '0' && char <= '9') ||
416 char == '-' || char == '_' || char == '.') {
417 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
418 }
419 }
420
421 // additional check to prevent multiple sequential dots
422 if strings.Contains(name, "..") {
423 return fmt.Errorf("Repository name cannot contain sequential dots")
424 }
425
426 // if all checks pass
427 return nil
428}
429
430func stripGitExt(name string) string {
431 return strings.TrimSuffix(name, ".git")
432}
433
434func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
435 switch r.Method {
436 case http.MethodGet:
437 user := s.oauth.GetUser(r)
438 knots, err := s.enforcer.GetKnotsForUser(user.Did)
439 if err != nil {
440 s.pages.Notice(w, "repo", "Invalid user account.")
441 return
442 }
443
444 s.pages.NewRepo(w, pages.NewRepoParams{
445 LoggedInUser: user,
446 Knots: knots,
447 })
448
449 case http.MethodPost:
450 l := s.logger.With("handler", "NewRepo")
451
452 user := s.oauth.GetUser(r)
453 l = l.With("did", user.Did)
454
455 // form validation
456 domain := r.FormValue("domain")
457 if domain == "" {
458 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
459 return
460 }
461 l = l.With("knot", domain)
462
463 repoName := r.FormValue("name")
464 if repoName == "" {
465 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
466 return
467 }
468
469 if err := validateRepoName(repoName); err != nil {
470 s.pages.Notice(w, "repo", err.Error())
471 return
472 }
473 repoName = stripGitExt(repoName)
474 l = l.With("repoName", repoName)
475
476 defaultBranch := r.FormValue("branch")
477 if defaultBranch == "" {
478 defaultBranch = "main"
479 }
480 l = l.With("defaultBranch", defaultBranch)
481
482 description := r.FormValue("description")
483
484 // ACL validation
485 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
486 if err != nil || !ok {
487 l.Info("unauthorized")
488 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
489 return
490 }
491
492 // Check for existing repos
493 existingRepo, err := db.GetRepo(
494 s.db,
495 db.FilterEq("did", user.Did),
496 db.FilterEq("name", repoName),
497 )
498 if err == nil && existingRepo != nil {
499 l.Info("repo exists")
500 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
501 return
502 }
503
504 // create atproto record for this repo
505 rkey := tid.TID()
506 repo := &models.Repo{
507 Did: user.Did,
508 Name: repoName,
509 Knot: domain,
510 Rkey: rkey,
511 Description: description,
512 Created: time.Now(),
513 Labels: models.DefaultLabelDefs(),
514 }
515 record := repo.AsRecord()
516
517 atpClient, err := s.oauth.AuthorizedClient(r)
518 if err != nil {
519 l.Info("PDS write failed", "err", err)
520 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
521 return
522 }
523
524 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
525 Collection: tangled.RepoNSID,
526 Repo: user.Did,
527 Rkey: rkey,
528 Record: &lexutil.LexiconTypeDecoder{
529 Val: &record,
530 },
531 })
532 if err != nil {
533 l.Info("PDS write failed", "err", err)
534 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
535 return
536 }
537
538 aturi := atresp.Uri
539 l = l.With("aturi", aturi)
540 l.Info("wrote to PDS")
541
542 tx, err := s.db.BeginTx(r.Context(), nil)
543 if err != nil {
544 l.Info("txn failed", "err", err)
545 s.pages.Notice(w, "repo", "Failed to save repository information.")
546 return
547 }
548
549 // The rollback function reverts a few things on failure:
550 // - the pending txn
551 // - the ACLs
552 // - the atproto record created
553 rollback := func() {
554 err1 := tx.Rollback()
555 err2 := s.enforcer.E.LoadPolicy()
556 err3 := rollbackRecord(context.Background(), aturi, atpClient)
557
558 // ignore txn complete errors, this is okay
559 if errors.Is(err1, sql.ErrTxDone) {
560 err1 = nil
561 }
562
563 if errs := errors.Join(err1, err2, err3); errs != nil {
564 l.Error("failed to rollback changes", "errs", errs)
565 return
566 }
567 }
568 defer rollback()
569
570 client, err := s.oauth.ServiceClient(
571 r,
572 oauth.WithService(domain),
573 oauth.WithLxm(tangled.RepoCreateNSID),
574 oauth.WithDev(s.config.Core.Dev),
575 )
576 if err != nil {
577 l.Error("service auth failed", "err", err)
578 s.pages.Notice(w, "repo", "Failed to reach PDS.")
579 return
580 }
581
582 xe := tangled.RepoCreate(
583 r.Context(),
584 client,
585 &tangled.RepoCreate_Input{
586 Rkey: rkey,
587 },
588 )
589 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
590 l.Error("xrpc error", "xe", xe)
591 s.pages.Notice(w, "repo", err.Error())
592 return
593 }
594
595 err = db.AddRepo(tx, repo)
596 if err != nil {
597 l.Error("db write failed", "err", err)
598 s.pages.Notice(w, "repo", "Failed to save repository information.")
599 return
600 }
601
602 // acls
603 p, _ := securejoin.SecureJoin(user.Did, repoName)
604 err = s.enforcer.AddRepo(user.Did, domain, p)
605 if err != nil {
606 l.Error("acl setup failed", "err", err)
607 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
608 return
609 }
610
611 err = tx.Commit()
612 if err != nil {
613 l.Error("txn commit failed", "err", err)
614 http.Error(w, err.Error(), http.StatusInternalServerError)
615 return
616 }
617
618 err = s.enforcer.E.SavePolicy()
619 if err != nil {
620 l.Error("acl save failed", "err", err)
621 http.Error(w, err.Error(), http.StatusInternalServerError)
622 return
623 }
624
625 // reset the ATURI because the transaction completed successfully
626 aturi = ""
627
628 s.notifier.NewRepo(r.Context(), repo)
629 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Did, repoName))
630 }
631}
632
633// this is used to rollback changes made to the PDS
634//
635// it is a no-op if the provided ATURI is empty
636func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
637 if aturi == "" {
638 return nil
639 }
640
641 parsed := syntax.ATURI(aturi)
642
643 collection := parsed.Collection().String()
644 repo := parsed.Authority().String()
645 rkey := parsed.RecordKey().String()
646
647 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
648 Collection: collection,
649 Repo: repo,
650 Rkey: rkey,
651 })
652 return err
653}
654
655func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver) error {
656 defaults := models.DefaultLabelDefs()
657 defaultLabels, err := db.GetLabelDefinitions(e, db.FilterIn("at_uri", defaults))
658 if err != nil {
659 return err
660 }
661 // already present
662 if len(defaultLabels) == len(defaults) {
663 return nil
664 }
665
666 labelDefs, err := models.FetchDefaultDefs(r)
667 if err != nil {
668 return err
669 }
670
671 // Insert each label definition to the database
672 for _, labelDef := range labelDefs {
673 _, err = db.AddLabelDefinition(e, &labelDef)
674 if err != nil {
675 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
676 }
677 }
678
679 return nil
680}