1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log"
9 "log/slog"
10 "net/http"
11 "strings"
12 "time"
13
14 "tangled.org/core/api/tangled"
15 "tangled.org/core/appview"
16 "tangled.org/core/appview/cache"
17 "tangled.org/core/appview/cache/session"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/notify"
22 dbnotify "tangled.org/core/appview/notify/db"
23 phnotify "tangled.org/core/appview/notify/posthog"
24 "tangled.org/core/appview/oauth"
25 "tangled.org/core/appview/pages"
26 "tangled.org/core/appview/reporesolver"
27 "tangled.org/core/appview/validator"
28 xrpcclient "tangled.org/core/appview/xrpcclient"
29 "tangled.org/core/eventconsumer"
30 "tangled.org/core/idresolver"
31 "tangled.org/core/jetstream"
32 tlog "tangled.org/core/log"
33 "tangled.org/core/rbac"
34 "tangled.org/core/tid"
35
36 comatproto "github.com/bluesky-social/indigo/api/atproto"
37 atpclient "github.com/bluesky-social/indigo/atproto/client"
38 "github.com/bluesky-social/indigo/atproto/syntax"
39 lexutil "github.com/bluesky-social/indigo/lex/util"
40 securejoin "github.com/cyphar/filepath-securejoin"
41 "github.com/go-chi/chi/v5"
42 "github.com/posthog/posthog-go"
43)
44
45type State struct {
46 db *db.DB
47 notifier notify.Notifier
48 oauth *oauth.OAuth
49 enforcer *rbac.Enforcer
50 pages *pages.Pages
51 sess *session.SessionStore
52 idResolver *idresolver.Resolver
53 posthog posthog.Client
54 jc *jetstream.JetstreamClient
55 config *config.Config
56 repoResolver *reporesolver.RepoResolver
57 knotstream *eventconsumer.Consumer
58 spindlestream *eventconsumer.Consumer
59 logger *slog.Logger
60 validator *validator.Validator
61}
62
63func Make(ctx context.Context, config *config.Config) (*State, error) {
64 d, err := db.Make(config.Core.DbPath)
65 if err != nil {
66 return nil, fmt.Errorf("failed to create db: %w", err)
67 }
68
69 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
70 if err != nil {
71 return nil, fmt.Errorf("failed to create enforcer: %w", err)
72 }
73
74 res, err := idresolver.RedisResolver(config.Redis.ToURL())
75 if err != nil {
76 log.Printf("failed to create redis resolver: %v", err)
77 res = idresolver.DefaultResolver()
78 }
79
80 pages := pages.NewPages(config, res)
81 cache := cache.New(config.Redis.Addr)
82 sess := session.New(cache)
83 oauth2, err := oauth.New(config)
84 if err != nil {
85 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
86 }
87 validator := validator.New(d, res, enforcer)
88
89 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
90 if err != nil {
91 return nil, fmt.Errorf("failed to create posthog client: %w", err)
92 }
93
94 repoResolver := reporesolver.New(config, enforcer, res, d)
95
96 wrapper := db.DbWrapper{Execer: d}
97 jc, err := jetstream.NewJetstreamClient(
98 config.Jetstream.Endpoint,
99 "appview",
100 []string{
101 tangled.GraphFollowNSID,
102 tangled.FeedStarNSID,
103 tangled.PublicKeyNSID,
104 tangled.RepoArtifactNSID,
105 tangled.ActorProfileNSID,
106 tangled.SpindleMemberNSID,
107 tangled.SpindleNSID,
108 tangled.StringNSID,
109 tangled.RepoIssueNSID,
110 tangled.RepoIssueCommentNSID,
111 tangled.LabelDefinitionNSID,
112 tangled.LabelOpNSID,
113 },
114 nil,
115 slog.Default(),
116 wrapper,
117 false,
118
119 // in-memory filter is inapplicalble to appview so
120 // we'll never log dids anyway.
121 false,
122 )
123 if err != nil {
124 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
125 }
126
127 if err := BackfillDefaultDefs(d, res); err != nil {
128 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
129 }
130
131 ingester := appview.Ingester{
132 Db: wrapper,
133 Enforcer: enforcer,
134 IdResolver: res,
135 Config: config,
136 Logger: tlog.New("ingester"),
137 Validator: validator,
138 }
139 err = jc.StartJetstream(ctx, ingester.Ingest())
140 if err != nil {
141 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
142 }
143
144 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
145 if err != nil {
146 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
147 }
148 knotstream.Start(ctx)
149
150 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
151 if err != nil {
152 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
153 }
154 spindlestream.Start(ctx)
155
156 var notifiers []notify.Notifier
157
158 // Always add the database notifier
159 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
160
161 // Add other notifiers in production only
162 if !config.Core.Dev {
163 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
164 }
165 notifier := notify.NewMergedNotifier(notifiers...)
166
167 state := &State{
168 d,
169 notifier,
170 oauth2,
171 enforcer,
172 pages,
173 sess,
174 res,
175 posthog,
176 jc,
177 config,
178 repoResolver,
179 knotstream,
180 spindlestream,
181 slog.Default(),
182 validator,
183 }
184
185 return state, nil
186}
187
188func (s *State) Close() error {
189 // other close up logic goes here
190 return s.db.Close()
191}
192
193func (s *State) Favicon(w http.ResponseWriter, r *http.Request) {
194 w.Header().Set("Content-Type", "image/svg+xml")
195 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year
196 w.Header().Set("ETag", `"favicon-svg-v1"`)
197
198 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` {
199 w.WriteHeader(http.StatusNotModified)
200 return
201 }
202
203 s.pages.Favicon(w)
204}
205
206// https://developer.mozilla.org/en-US/docs/Web/Progressive_web_apps/Manifest
207const manifestJson = `{
208 "name": "tangled",
209 "description": "tightly-knit social coding.",
210 "icons": [
211 {
212 "src": "/favicon.svg",
213 "sizes": "144x144"
214 }
215 ],
216 "start_url": "/",
217 "id": "org.tangled",
218
219 "display": "standalone",
220 "background_color": "#111827",
221 "theme_color": "#111827"
222}`
223
224func (p *State) PWAManifest(w http.ResponseWriter, r *http.Request) {
225 w.Header().Set("Content-Type", "application/json")
226 w.Write([]byte(manifestJson))
227}
228
229func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
230 user := s.oauth.GetUser(r)
231 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
232 LoggedInUser: user,
233 })
234}
235
236func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
237 user := s.oauth.GetUser(r)
238 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
239 LoggedInUser: user,
240 })
241}
242
243func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
244 user := s.oauth.GetUser(r)
245 s.pages.Brand(w, pages.BrandParams{
246 LoggedInUser: user,
247 })
248}
249
250func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) {
251 if s.oauth.GetUser(r) != nil {
252 s.Timeline(w, r)
253 return
254 }
255 s.Home(w, r)
256}
257
258func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
259 user := s.oauth.GetUser(r)
260
261 var userDid string
262 if user != nil {
263 userDid = user.Did
264 }
265 timeline, err := db.MakeTimeline(s.db, 50, userDid)
266 if err != nil {
267 log.Println(err)
268 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
269 }
270
271 repos, err := db.GetTopStarredReposLastWeek(s.db)
272 if err != nil {
273 log.Println(err)
274 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
275 return
276 }
277
278 gfiLabel, err := db.GetLabelDefinition(s.db, db.FilterEq("at_uri", models.LabelGoodFirstIssue))
279 if err != nil {
280 // non-fatal
281 }
282
283 s.pages.Timeline(w, pages.TimelineParams{
284 LoggedInUser: user,
285 Timeline: timeline,
286 Repos: repos,
287 GfiLabel: gfiLabel,
288 })
289}
290
291func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
292 user := s.oauth.GetUser(r)
293 if user == nil {
294 return
295 }
296
297 l := s.logger.With("handler", "UpgradeBanner")
298 l = l.With("did", user.Did)
299
300 regs, err := db.GetRegistrations(
301 s.db,
302 db.FilterEq("did", user.Did),
303 db.FilterEq("needs_upgrade", 1),
304 )
305 if err != nil {
306 l.Error("non-fatal: failed to get registrations", "err", err)
307 }
308
309 spindles, err := db.GetSpindles(
310 s.db,
311 db.FilterEq("owner", user.Did),
312 db.FilterEq("needs_upgrade", 1),
313 )
314 if err != nil {
315 l.Error("non-fatal: failed to get spindles", "err", err)
316 }
317
318 if regs == nil && spindles == nil {
319 return
320 }
321
322 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
323 Registrations: regs,
324 Spindles: spindles,
325 })
326}
327
328func (s *State) Home(w http.ResponseWriter, r *http.Request) {
329 timeline, err := db.MakeTimeline(s.db, 5, "")
330 if err != nil {
331 log.Println(err)
332 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
333 return
334 }
335
336 repos, err := db.GetTopStarredReposLastWeek(s.db)
337 if err != nil {
338 log.Println(err)
339 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
340 return
341 }
342
343 s.pages.Home(w, pages.TimelineParams{
344 LoggedInUser: nil,
345 Timeline: timeline,
346 Repos: repos,
347 })
348}
349
350func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
351 user := chi.URLParam(r, "user")
352 user = strings.TrimPrefix(user, "@")
353
354 if user == "" {
355 w.WriteHeader(http.StatusBadRequest)
356 return
357 }
358
359 id, err := s.idResolver.ResolveIdent(r.Context(), user)
360 if err != nil {
361 w.WriteHeader(http.StatusInternalServerError)
362 return
363 }
364
365 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
366 if err != nil {
367 w.WriteHeader(http.StatusNotFound)
368 return
369 }
370
371 if len(pubKeys) == 0 {
372 w.WriteHeader(http.StatusNotFound)
373 return
374 }
375
376 for _, k := range pubKeys {
377 key := strings.TrimRight(k.Key, "\n")
378 fmt.Fprintln(w, key)
379 }
380}
381
382func validateRepoName(name string) error {
383 // check for path traversal attempts
384 if name == "." || name == ".." ||
385 strings.Contains(name, "/") || strings.Contains(name, "\\") {
386 return fmt.Errorf("Repository name contains invalid path characters")
387 }
388
389 // check for sequences that could be used for traversal when normalized
390 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
391 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
392 return fmt.Errorf("Repository name contains invalid path sequence")
393 }
394
395 // then continue with character validation
396 for _, char := range name {
397 if !((char >= 'a' && char <= 'z') ||
398 (char >= 'A' && char <= 'Z') ||
399 (char >= '0' && char <= '9') ||
400 char == '-' || char == '_' || char == '.') {
401 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
402 }
403 }
404
405 // additional check to prevent multiple sequential dots
406 if strings.Contains(name, "..") {
407 return fmt.Errorf("Repository name cannot contain sequential dots")
408 }
409
410 // if all checks pass
411 return nil
412}
413
414func stripGitExt(name string) string {
415 return strings.TrimSuffix(name, ".git")
416}
417
418func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
419 switch r.Method {
420 case http.MethodGet:
421 user := s.oauth.GetUser(r)
422 knots, err := s.enforcer.GetKnotsForUser(user.Did)
423 if err != nil {
424 s.pages.Notice(w, "repo", "Invalid user account.")
425 return
426 }
427
428 s.pages.NewRepo(w, pages.NewRepoParams{
429 LoggedInUser: user,
430 Knots: knots,
431 })
432
433 case http.MethodPost:
434 l := s.logger.With("handler", "NewRepo")
435
436 user := s.oauth.GetUser(r)
437 l = l.With("did", user.Did)
438
439 // form validation
440 domain := r.FormValue("domain")
441 if domain == "" {
442 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
443 return
444 }
445 l = l.With("knot", domain)
446
447 repoName := r.FormValue("name")
448 if repoName == "" {
449 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
450 return
451 }
452
453 if err := validateRepoName(repoName); err != nil {
454 s.pages.Notice(w, "repo", err.Error())
455 return
456 }
457 repoName = stripGitExt(repoName)
458 l = l.With("repoName", repoName)
459
460 defaultBranch := r.FormValue("branch")
461 if defaultBranch == "" {
462 defaultBranch = "main"
463 }
464 l = l.With("defaultBranch", defaultBranch)
465
466 description := r.FormValue("description")
467
468 // ACL validation
469 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
470 if err != nil || !ok {
471 l.Info("unauthorized")
472 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
473 return
474 }
475
476 // Check for existing repos
477 existingRepo, err := db.GetRepo(
478 s.db,
479 db.FilterEq("did", user.Did),
480 db.FilterEq("name", repoName),
481 )
482 if err == nil && existingRepo != nil {
483 l.Info("repo exists")
484 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
485 return
486 }
487
488 // create atproto record for this repo
489 rkey := tid.TID()
490 repo := &models.Repo{
491 Did: user.Did,
492 Name: repoName,
493 Knot: domain,
494 Rkey: rkey,
495 Description: description,
496 Created: time.Now(),
497 Labels: models.DefaultLabelDefs(),
498 }
499 record := repo.AsRecord()
500
501 atpClient, err := s.oauth.AuthorizedClient(r)
502 if err != nil {
503 l.Info("PDS write failed", "err", err)
504 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
505 return
506 }
507
508 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
509 Collection: tangled.RepoNSID,
510 Repo: user.Did,
511 Rkey: rkey,
512 Record: &lexutil.LexiconTypeDecoder{
513 Val: &record,
514 },
515 })
516 if err != nil {
517 l.Info("PDS write failed", "err", err)
518 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
519 return
520 }
521
522 aturi := atresp.Uri
523 l = l.With("aturi", aturi)
524 l.Info("wrote to PDS")
525
526 tx, err := s.db.BeginTx(r.Context(), nil)
527 if err != nil {
528 l.Info("txn failed", "err", err)
529 s.pages.Notice(w, "repo", "Failed to save repository information.")
530 return
531 }
532
533 // The rollback function reverts a few things on failure:
534 // - the pending txn
535 // - the ACLs
536 // - the atproto record created
537 rollback := func() {
538 err1 := tx.Rollback()
539 err2 := s.enforcer.E.LoadPolicy()
540 err3 := rollbackRecord(context.Background(), aturi, atpClient)
541
542 // ignore txn complete errors, this is okay
543 if errors.Is(err1, sql.ErrTxDone) {
544 err1 = nil
545 }
546
547 if errs := errors.Join(err1, err2, err3); errs != nil {
548 l.Error("failed to rollback changes", "errs", errs)
549 return
550 }
551 }
552 defer rollback()
553
554 client, err := s.oauth.ServiceClient(
555 r,
556 oauth.WithService(domain),
557 oauth.WithLxm(tangled.RepoCreateNSID),
558 oauth.WithDev(s.config.Core.Dev),
559 )
560 if err != nil {
561 l.Error("service auth failed", "err", err)
562 s.pages.Notice(w, "repo", "Failed to reach PDS.")
563 return
564 }
565
566 xe := tangled.RepoCreate(
567 r.Context(),
568 client,
569 &tangled.RepoCreate_Input{
570 Rkey: rkey,
571 },
572 )
573 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
574 l.Error("xrpc error", "xe", xe)
575 s.pages.Notice(w, "repo", err.Error())
576 return
577 }
578
579 err = db.AddRepo(tx, repo)
580 if err != nil {
581 l.Error("db write failed", "err", err)
582 s.pages.Notice(w, "repo", "Failed to save repository information.")
583 return
584 }
585
586 // acls
587 p, _ := securejoin.SecureJoin(user.Did, repoName)
588 err = s.enforcer.AddRepo(user.Did, domain, p)
589 if err != nil {
590 l.Error("acl setup failed", "err", err)
591 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
592 return
593 }
594
595 err = tx.Commit()
596 if err != nil {
597 l.Error("txn commit failed", "err", err)
598 http.Error(w, err.Error(), http.StatusInternalServerError)
599 return
600 }
601
602 err = s.enforcer.E.SavePolicy()
603 if err != nil {
604 l.Error("acl save failed", "err", err)
605 http.Error(w, err.Error(), http.StatusInternalServerError)
606 return
607 }
608
609 // reset the ATURI because the transaction completed successfully
610 aturi = ""
611
612 s.notifier.NewRepo(r.Context(), repo)
613 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Did, repoName))
614 }
615}
616
617// this is used to rollback changes made to the PDS
618//
619// it is a no-op if the provided ATURI is empty
620func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
621 if aturi == "" {
622 return nil
623 }
624
625 parsed := syntax.ATURI(aturi)
626
627 collection := parsed.Collection().String()
628 repo := parsed.Authority().String()
629 rkey := parsed.RecordKey().String()
630
631 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
632 Collection: collection,
633 Repo: repo,
634 Rkey: rkey,
635 })
636 return err
637}
638
639func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver) error {
640 defaults := models.DefaultLabelDefs()
641 defaultLabels, err := db.GetLabelDefinitions(e, db.FilterIn("at_uri", defaults))
642 if err != nil {
643 return err
644 }
645 // already present
646 if len(defaultLabels) == len(defaults) {
647 return nil
648 }
649
650 labelDefs, err := models.FetchDefaultDefs(r)
651 if err != nil {
652 return err
653 }
654
655 // Insert each label definition to the database
656 for _, labelDef := range labelDefs {
657 _, err = db.AddLabelDefinition(e, &labelDef)
658 if err != nil {
659 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
660 }
661 }
662
663 return nil
664}