1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log"
9 "log/slog"
10 "net/http"
11 "strings"
12 "time"
13
14 "tangled.org/core/api/tangled"
15 "tangled.org/core/appview"
16 "tangled.org/core/appview/cache"
17 "tangled.org/core/appview/cache/session"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/notify"
22 dbnotify "tangled.org/core/appview/notify/db"
23 phnotify "tangled.org/core/appview/notify/posthog"
24 "tangled.org/core/appview/oauth"
25 "tangled.org/core/appview/pages"
26 "tangled.org/core/appview/reporesolver"
27 "tangled.org/core/appview/validator"
28 xrpcclient "tangled.org/core/appview/xrpcclient"
29 "tangled.org/core/eventconsumer"
30 "tangled.org/core/idresolver"
31 "tangled.org/core/jetstream"
32 tlog "tangled.org/core/log"
33 "tangled.org/core/rbac"
34 "tangled.org/core/tid"
35
36 comatproto "github.com/bluesky-social/indigo/api/atproto"
37 atpclient "github.com/bluesky-social/indigo/atproto/client"
38 "github.com/bluesky-social/indigo/atproto/syntax"
39 lexutil "github.com/bluesky-social/indigo/lex/util"
40 securejoin "github.com/cyphar/filepath-securejoin"
41 "github.com/go-chi/chi/v5"
42 "github.com/posthog/posthog-go"
43)
44
45type State struct {
46 db *db.DB
47 notifier notify.Notifier
48 oauth *oauth.OAuth
49 enforcer *rbac.Enforcer
50 pages *pages.Pages
51 sess *session.SessionStore
52 idResolver *idresolver.Resolver
53 posthog posthog.Client
54 jc *jetstream.JetstreamClient
55 config *config.Config
56 repoResolver *reporesolver.RepoResolver
57 knotstream *eventconsumer.Consumer
58 spindlestream *eventconsumer.Consumer
59 logger *slog.Logger
60 validator *validator.Validator
61}
62
63func Make(ctx context.Context, config *config.Config) (*State, error) {
64 d, err := db.Make(config.Core.DbPath)
65 if err != nil {
66 return nil, fmt.Errorf("failed to create db: %w", err)
67 }
68
69 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
70 if err != nil {
71 return nil, fmt.Errorf("failed to create enforcer: %w", err)
72 }
73
74 res, err := idresolver.RedisResolver(config.Redis.ToURL())
75 if err != nil {
76 log.Printf("failed to create redis resolver: %v", err)
77 res = idresolver.DefaultResolver()
78 }
79
80 pages := pages.NewPages(config, res)
81 cache := cache.New(config.Redis.Addr)
82 sess := session.New(cache)
83 oauth2, err := oauth.New(config)
84 if err != nil {
85 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
86 }
87 validator := validator.New(d, res, enforcer)
88
89 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
90 if err != nil {
91 return nil, fmt.Errorf("failed to create posthog client: %w", err)
92 }
93
94 repoResolver := reporesolver.New(config, enforcer, res, d)
95
96 wrapper := db.DbWrapper{Execer: d}
97 jc, err := jetstream.NewJetstreamClient(
98 config.Jetstream.Endpoint,
99 "appview",
100 []string{
101 tangled.GraphFollowNSID,
102 tangled.FeedStarNSID,
103 tangled.PublicKeyNSID,
104 tangled.RepoArtifactNSID,
105 tangled.ActorProfileNSID,
106 tangled.SpindleMemberNSID,
107 tangled.SpindleNSID,
108 tangled.StringNSID,
109 tangled.RepoIssueNSID,
110 tangled.RepoIssueCommentNSID,
111 tangled.LabelDefinitionNSID,
112 tangled.LabelOpNSID,
113 },
114 nil,
115 slog.Default(),
116 wrapper,
117 false,
118
119 // in-memory filter is inapplicalble to appview so
120 // we'll never log dids anyway.
121 false,
122 )
123 if err != nil {
124 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
125 }
126
127 if err := BackfillDefaultDefs(d, res); err != nil {
128 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
129 }
130
131 ingester := appview.Ingester{
132 Db: wrapper,
133 Enforcer: enforcer,
134 IdResolver: res,
135 Config: config,
136 Logger: tlog.New("ingester"),
137 Validator: validator,
138 }
139 err = jc.StartJetstream(ctx, ingester.Ingest())
140 if err != nil {
141 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
142 }
143
144 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
145 if err != nil {
146 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
147 }
148 knotstream.Start(ctx)
149
150 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
151 if err != nil {
152 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
153 }
154 spindlestream.Start(ctx)
155
156 var notifiers []notify.Notifier
157
158 // Always add the database notifier
159 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
160
161 // Add other notifiers in production only
162 if !config.Core.Dev {
163 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
164 }
165 notifier := notify.NewMergedNotifier(notifiers...)
166
167 state := &State{
168 d,
169 notifier,
170 oauth2,
171 enforcer,
172 pages,
173 sess,
174 res,
175 posthog,
176 jc,
177 config,
178 repoResolver,
179 knotstream,
180 spindlestream,
181 slog.Default(),
182 validator,
183 }
184
185 return state, nil
186}
187
188func (s *State) Close() error {
189 // other close up logic goes here
190 return s.db.Close()
191}
192
193func (s *State) Favicon(w http.ResponseWriter, r *http.Request) {
194 w.Header().Set("Content-Type", "image/svg+xml")
195 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year
196 w.Header().Set("ETag", `"favicon-svg-v1"`)
197
198 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` {
199 w.WriteHeader(http.StatusNotModified)
200 return
201 }
202
203 s.pages.Favicon(w)
204}
205
206func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
207 w.Header().Set("Content-Type", "text/plain")
208 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
209
210 robotsTxt := `User-agent: *
211Allow: /
212`
213 w.Write([]byte(robotsTxt))
214}
215
216// https://developer.mozilla.org/en-US/docs/Web/Progressive_web_apps/Manifest
217const manifestJson = `{
218 "name": "tangled",
219 "description": "tightly-knit social coding.",
220 "icons": [
221 {
222 "src": "/favicon.svg",
223 "sizes": "144x144"
224 }
225 ],
226 "start_url": "/",
227 "id": "org.tangled",
228
229 "display": "standalone",
230 "background_color": "#111827",
231 "theme_color": "#111827"
232}`
233
234func (p *State) PWAManifest(w http.ResponseWriter, r *http.Request) {
235 w.Header().Set("Content-Type", "application/json")
236 w.Write([]byte(manifestJson))
237}
238
239func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
240 user := s.oauth.GetUser(r)
241 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
242 LoggedInUser: user,
243 })
244}
245
246func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
247 user := s.oauth.GetUser(r)
248 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
249 LoggedInUser: user,
250 })
251}
252
253func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
254 user := s.oauth.GetUser(r)
255 s.pages.Brand(w, pages.BrandParams{
256 LoggedInUser: user,
257 })
258}
259
260func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) {
261 if s.oauth.GetUser(r) != nil {
262 s.Timeline(w, r)
263 return
264 }
265 s.Home(w, r)
266}
267
268func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
269 user := s.oauth.GetUser(r)
270
271 var userDid string
272 if user != nil {
273 userDid = user.Did
274 }
275 timeline, err := db.MakeTimeline(s.db, 50, userDid)
276 if err != nil {
277 log.Println(err)
278 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
279 }
280
281 repos, err := db.GetTopStarredReposLastWeek(s.db)
282 if err != nil {
283 log.Println(err)
284 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
285 return
286 }
287
288 gfiLabel, err := db.GetLabelDefinition(s.db, db.FilterEq("at_uri", models.LabelGoodFirstIssue))
289 if err != nil {
290 // non-fatal
291 }
292
293 s.pages.Timeline(w, pages.TimelineParams{
294 LoggedInUser: user,
295 Timeline: timeline,
296 Repos: repos,
297 GfiLabel: gfiLabel,
298 })
299}
300
301func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
302 user := s.oauth.GetUser(r)
303 if user == nil {
304 return
305 }
306
307 l := s.logger.With("handler", "UpgradeBanner")
308 l = l.With("did", user.Did)
309
310 regs, err := db.GetRegistrations(
311 s.db,
312 db.FilterEq("did", user.Did),
313 db.FilterEq("needs_upgrade", 1),
314 )
315 if err != nil {
316 l.Error("non-fatal: failed to get registrations", "err", err)
317 }
318
319 spindles, err := db.GetSpindles(
320 s.db,
321 db.FilterEq("owner", user.Did),
322 db.FilterEq("needs_upgrade", 1),
323 )
324 if err != nil {
325 l.Error("non-fatal: failed to get spindles", "err", err)
326 }
327
328 if regs == nil && spindles == nil {
329 return
330 }
331
332 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
333 Registrations: regs,
334 Spindles: spindles,
335 })
336}
337
338func (s *State) Home(w http.ResponseWriter, r *http.Request) {
339 timeline, err := db.MakeTimeline(s.db, 5, "")
340 if err != nil {
341 log.Println(err)
342 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
343 return
344 }
345
346 repos, err := db.GetTopStarredReposLastWeek(s.db)
347 if err != nil {
348 log.Println(err)
349 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
350 return
351 }
352
353 s.pages.Home(w, pages.TimelineParams{
354 LoggedInUser: nil,
355 Timeline: timeline,
356 Repos: repos,
357 })
358}
359
360func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
361 user := chi.URLParam(r, "user")
362 user = strings.TrimPrefix(user, "@")
363
364 if user == "" {
365 w.WriteHeader(http.StatusBadRequest)
366 return
367 }
368
369 id, err := s.idResolver.ResolveIdent(r.Context(), user)
370 if err != nil {
371 w.WriteHeader(http.StatusInternalServerError)
372 return
373 }
374
375 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
376 if err != nil {
377 w.WriteHeader(http.StatusNotFound)
378 return
379 }
380
381 if len(pubKeys) == 0 {
382 w.WriteHeader(http.StatusNotFound)
383 return
384 }
385
386 for _, k := range pubKeys {
387 key := strings.TrimRight(k.Key, "\n")
388 fmt.Fprintln(w, key)
389 }
390}
391
392func validateRepoName(name string) error {
393 // check for path traversal attempts
394 if name == "." || name == ".." ||
395 strings.Contains(name, "/") || strings.Contains(name, "\\") {
396 return fmt.Errorf("Repository name contains invalid path characters")
397 }
398
399 // check for sequences that could be used for traversal when normalized
400 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
401 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
402 return fmt.Errorf("Repository name contains invalid path sequence")
403 }
404
405 // then continue with character validation
406 for _, char := range name {
407 if !((char >= 'a' && char <= 'z') ||
408 (char >= 'A' && char <= 'Z') ||
409 (char >= '0' && char <= '9') ||
410 char == '-' || char == '_' || char == '.') {
411 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
412 }
413 }
414
415 // additional check to prevent multiple sequential dots
416 if strings.Contains(name, "..") {
417 return fmt.Errorf("Repository name cannot contain sequential dots")
418 }
419
420 // if all checks pass
421 return nil
422}
423
424func stripGitExt(name string) string {
425 return strings.TrimSuffix(name, ".git")
426}
427
428func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
429 switch r.Method {
430 case http.MethodGet:
431 user := s.oauth.GetUser(r)
432 knots, err := s.enforcer.GetKnotsForUser(user.Did)
433 if err != nil {
434 s.pages.Notice(w, "repo", "Invalid user account.")
435 return
436 }
437
438 s.pages.NewRepo(w, pages.NewRepoParams{
439 LoggedInUser: user,
440 Knots: knots,
441 })
442
443 case http.MethodPost:
444 l := s.logger.With("handler", "NewRepo")
445
446 user := s.oauth.GetUser(r)
447 l = l.With("did", user.Did)
448
449 // form validation
450 domain := r.FormValue("domain")
451 if domain == "" {
452 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
453 return
454 }
455 l = l.With("knot", domain)
456
457 repoName := r.FormValue("name")
458 if repoName == "" {
459 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
460 return
461 }
462
463 if err := validateRepoName(repoName); err != nil {
464 s.pages.Notice(w, "repo", err.Error())
465 return
466 }
467 repoName = stripGitExt(repoName)
468 l = l.With("repoName", repoName)
469
470 defaultBranch := r.FormValue("branch")
471 if defaultBranch == "" {
472 defaultBranch = "main"
473 }
474 l = l.With("defaultBranch", defaultBranch)
475
476 description := r.FormValue("description")
477
478 // ACL validation
479 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
480 if err != nil || !ok {
481 l.Info("unauthorized")
482 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
483 return
484 }
485
486 // Check for existing repos
487 existingRepo, err := db.GetRepo(
488 s.db,
489 db.FilterEq("did", user.Did),
490 db.FilterEq("name", repoName),
491 )
492 if err == nil && existingRepo != nil {
493 l.Info("repo exists")
494 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
495 return
496 }
497
498 // create atproto record for this repo
499 rkey := tid.TID()
500 repo := &models.Repo{
501 Did: user.Did,
502 Name: repoName,
503 Knot: domain,
504 Rkey: rkey,
505 Description: description,
506 Created: time.Now(),
507 Labels: models.DefaultLabelDefs(),
508 }
509 record := repo.AsRecord()
510
511 atpClient, err := s.oauth.AuthorizedClient(r)
512 if err != nil {
513 l.Info("PDS write failed", "err", err)
514 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
515 return
516 }
517
518 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
519 Collection: tangled.RepoNSID,
520 Repo: user.Did,
521 Rkey: rkey,
522 Record: &lexutil.LexiconTypeDecoder{
523 Val: &record,
524 },
525 })
526 if err != nil {
527 l.Info("PDS write failed", "err", err)
528 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
529 return
530 }
531
532 aturi := atresp.Uri
533 l = l.With("aturi", aturi)
534 l.Info("wrote to PDS")
535
536 tx, err := s.db.BeginTx(r.Context(), nil)
537 if err != nil {
538 l.Info("txn failed", "err", err)
539 s.pages.Notice(w, "repo", "Failed to save repository information.")
540 return
541 }
542
543 // The rollback function reverts a few things on failure:
544 // - the pending txn
545 // - the ACLs
546 // - the atproto record created
547 rollback := func() {
548 err1 := tx.Rollback()
549 err2 := s.enforcer.E.LoadPolicy()
550 err3 := rollbackRecord(context.Background(), aturi, atpClient)
551
552 // ignore txn complete errors, this is okay
553 if errors.Is(err1, sql.ErrTxDone) {
554 err1 = nil
555 }
556
557 if errs := errors.Join(err1, err2, err3); errs != nil {
558 l.Error("failed to rollback changes", "errs", errs)
559 return
560 }
561 }
562 defer rollback()
563
564 client, err := s.oauth.ServiceClient(
565 r,
566 oauth.WithService(domain),
567 oauth.WithLxm(tangled.RepoCreateNSID),
568 oauth.WithDev(s.config.Core.Dev),
569 )
570 if err != nil {
571 l.Error("service auth failed", "err", err)
572 s.pages.Notice(w, "repo", "Failed to reach PDS.")
573 return
574 }
575
576 xe := tangled.RepoCreate(
577 r.Context(),
578 client,
579 &tangled.RepoCreate_Input{
580 Rkey: rkey,
581 },
582 )
583 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
584 l.Error("xrpc error", "xe", xe)
585 s.pages.Notice(w, "repo", err.Error())
586 return
587 }
588
589 err = db.AddRepo(tx, repo)
590 if err != nil {
591 l.Error("db write failed", "err", err)
592 s.pages.Notice(w, "repo", "Failed to save repository information.")
593 return
594 }
595
596 // acls
597 p, _ := securejoin.SecureJoin(user.Did, repoName)
598 err = s.enforcer.AddRepo(user.Did, domain, p)
599 if err != nil {
600 l.Error("acl setup failed", "err", err)
601 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
602 return
603 }
604
605 err = tx.Commit()
606 if err != nil {
607 l.Error("txn commit failed", "err", err)
608 http.Error(w, err.Error(), http.StatusInternalServerError)
609 return
610 }
611
612 err = s.enforcer.E.SavePolicy()
613 if err != nil {
614 l.Error("acl save failed", "err", err)
615 http.Error(w, err.Error(), http.StatusInternalServerError)
616 return
617 }
618
619 // reset the ATURI because the transaction completed successfully
620 aturi = ""
621
622 s.notifier.NewRepo(r.Context(), repo)
623 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Did, repoName))
624 }
625}
626
627// this is used to rollback changes made to the PDS
628//
629// it is a no-op if the provided ATURI is empty
630func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
631 if aturi == "" {
632 return nil
633 }
634
635 parsed := syntax.ATURI(aturi)
636
637 collection := parsed.Collection().String()
638 repo := parsed.Authority().String()
639 rkey := parsed.RecordKey().String()
640
641 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
642 Collection: collection,
643 Repo: repo,
644 Rkey: rkey,
645 })
646 return err
647}
648
649func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver) error {
650 defaults := models.DefaultLabelDefs()
651 defaultLabels, err := db.GetLabelDefinitions(e, db.FilterIn("at_uri", defaults))
652 if err != nil {
653 return err
654 }
655 // already present
656 if len(defaultLabels) == len(defaults) {
657 return nil
658 }
659
660 labelDefs, err := models.FetchDefaultDefs(r)
661 if err != nil {
662 return err
663 }
664
665 // Insert each label definition to the database
666 for _, labelDef := range labelDefs {
667 _, err = db.AddLabelDefinition(e, &labelDef)
668 if err != nil {
669 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
670 }
671 }
672
673 return nil
674}