1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/config"
16 "tangled.org/core/appview/db"
17 "tangled.org/core/appview/indexer"
18 "tangled.org/core/appview/models"
19 "tangled.org/core/appview/notify"
20 dbnotify "tangled.org/core/appview/notify/db"
21 phnotify "tangled.org/core/appview/notify/posthog"
22 "tangled.org/core/appview/oauth"
23 "tangled.org/core/appview/pages"
24 "tangled.org/core/appview/reporesolver"
25 "tangled.org/core/appview/validator"
26 xrpcclient "tangled.org/core/appview/xrpcclient"
27 "tangled.org/core/eventconsumer"
28 "tangled.org/core/idresolver"
29 "tangled.org/core/jetstream"
30 "tangled.org/core/log"
31 tlog "tangled.org/core/log"
32 "tangled.org/core/rbac"
33 "tangled.org/core/tid"
34
35 comatproto "github.com/bluesky-social/indigo/api/atproto"
36 atpclient "github.com/bluesky-social/indigo/atproto/client"
37 "github.com/bluesky-social/indigo/atproto/syntax"
38 lexutil "github.com/bluesky-social/indigo/lex/util"
39 securejoin "github.com/cyphar/filepath-securejoin"
40 "github.com/go-chi/chi/v5"
41 "github.com/posthog/posthog-go"
42)
43
44type State struct {
45 db *db.DB
46 notifier notify.Notifier
47 indexer *indexer.Indexer
48 oauth *oauth.OAuth
49 enforcer *rbac.Enforcer
50 pages *pages.Pages
51 idResolver *idresolver.Resolver
52 posthog posthog.Client
53 jc *jetstream.JetstreamClient
54 config *config.Config
55 repoResolver *reporesolver.RepoResolver
56 knotstream *eventconsumer.Consumer
57 spindlestream *eventconsumer.Consumer
58 logger *slog.Logger
59 validator *validator.Validator
60}
61
62func Make(ctx context.Context, config *config.Config) (*State, error) {
63 logger := tlog.FromContext(ctx)
64
65 d, err := db.Make(ctx, config.Core.DbPath)
66 if err != nil {
67 return nil, fmt.Errorf("failed to create db: %w", err)
68 }
69
70 indexer := indexer.New(log.SubLogger(logger, "indexer"))
71 err = indexer.Init(ctx, d)
72 if err != nil {
73 return nil, fmt.Errorf("failed to create indexer: %w", err)
74 }
75
76 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
77 if err != nil {
78 return nil, fmt.Errorf("failed to create enforcer: %w", err)
79 }
80
81 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
82 if err != nil {
83 logger.Error("failed to create redis resolver", "err", err)
84 res = idresolver.DefaultResolver(config.Plc.PLCURL)
85 }
86
87 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
88 if err != nil {
89 return nil, fmt.Errorf("failed to create posthog client: %w", err)
90 }
91
92 pages := pages.NewPages(config, res, log.SubLogger(logger, "pages"))
93 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
94 if err != nil {
95 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
96 }
97 validator := validator.New(d, res, enforcer)
98
99 repoResolver := reporesolver.New(config, enforcer, res, d)
100
101 wrapper := db.DbWrapper{Execer: d}
102 jc, err := jetstream.NewJetstreamClient(
103 config.Jetstream.Endpoint,
104 "appview",
105 []string{
106 tangled.GraphFollowNSID,
107 tangled.FeedStarNSID,
108 tangled.PublicKeyNSID,
109 tangled.RepoArtifactNSID,
110 tangled.ActorProfileNSID,
111 tangled.SpindleMemberNSID,
112 tangled.SpindleNSID,
113 tangled.StringNSID,
114 tangled.RepoIssueNSID,
115 tangled.RepoIssueCommentNSID,
116 tangled.LabelDefinitionNSID,
117 tangled.LabelOpNSID,
118 },
119 nil,
120 tlog.SubLogger(logger, "jetstream"),
121 wrapper,
122 false,
123
124 // in-memory filter is inapplicalble to appview so
125 // we'll never log dids anyway.
126 false,
127 )
128 if err != nil {
129 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
130 }
131
132 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
133 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
134 }
135
136 ingester := appview.Ingester{
137 Db: wrapper,
138 Enforcer: enforcer,
139 IdResolver: res,
140 Config: config,
141 Logger: log.SubLogger(logger, "ingester"),
142 Validator: validator,
143 }
144 err = jc.StartJetstream(ctx, ingester.Ingest())
145 if err != nil {
146 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
147 }
148
149 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
150 if err != nil {
151 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
152 }
153 knotstream.Start(ctx)
154
155 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
156 if err != nil {
157 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
158 }
159 spindlestream.Start(ctx)
160
161 var notifiers []notify.Notifier
162
163 // Always add the database notifier
164 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
165
166 // Add other notifiers in production only
167 if !config.Core.Dev {
168 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
169 }
170 notifiers = append(notifiers, indexer)
171 notifier := notify.NewMergedNotifier(notifiers, tlog.SubLogger(logger, "notify"))
172
173 state := &State{
174 d,
175 notifier,
176 indexer,
177 oauth,
178 enforcer,
179 pages,
180 res,
181 posthog,
182 jc,
183 config,
184 repoResolver,
185 knotstream,
186 spindlestream,
187 logger,
188 validator,
189 }
190
191 return state, nil
192}
193
194func (s *State) Close() error {
195 // other close up logic goes here
196 return s.db.Close()
197}
198
199func (s *State) Favicon(w http.ResponseWriter, r *http.Request) {
200 w.Header().Set("Content-Type", "image/svg+xml")
201 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year
202 w.Header().Set("ETag", `"favicon-svg-v1"`)
203
204 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` {
205 w.WriteHeader(http.StatusNotModified)
206 return
207 }
208
209 s.pages.Favicon(w)
210}
211
212func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
213 w.Header().Set("Content-Type", "text/plain")
214 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
215
216 robotsTxt := `User-agent: *
217Allow: /
218`
219 w.Write([]byte(robotsTxt))
220}
221
222// https://developer.mozilla.org/en-US/docs/Web/Progressive_web_apps/Manifest
223const manifestJson = `{
224 "name": "tangled",
225 "description": "tightly-knit social coding.",
226 "icons": [
227 {
228 "src": "/favicon.svg",
229 "sizes": "144x144"
230 }
231 ],
232 "start_url": "/",
233 "id": "org.tangled",
234
235 "display": "standalone",
236 "background_color": "#111827",
237 "theme_color": "#111827"
238}`
239
240func (p *State) PWAManifest(w http.ResponseWriter, r *http.Request) {
241 w.Header().Set("Content-Type", "application/json")
242 w.Write([]byte(manifestJson))
243}
244
245func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
246 user := s.oauth.GetUser(r)
247 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
248 LoggedInUser: user,
249 })
250}
251
252func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
253 user := s.oauth.GetUser(r)
254 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
255 LoggedInUser: user,
256 })
257}
258
259func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
260 user := s.oauth.GetUser(r)
261 s.pages.Brand(w, pages.BrandParams{
262 LoggedInUser: user,
263 })
264}
265
266func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) {
267 if s.oauth.GetUser(r) != nil {
268 s.Timeline(w, r)
269 return
270 }
271 s.Home(w, r)
272}
273
274func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
275 user := s.oauth.GetUser(r)
276
277 // TODO: set this flag based on the UI
278 filtered := false
279
280 var userDid string
281 if user != nil {
282 userDid = user.Did
283 }
284 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered)
285 if err != nil {
286 s.logger.Error("failed to make timeline", "err", err)
287 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
288 }
289
290 repos, err := db.GetTopStarredReposLastWeek(s.db)
291 if err != nil {
292 s.logger.Error("failed to get top starred repos", "err", err)
293 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
294 return
295 }
296
297 gfiLabel, err := db.GetLabelDefinition(s.db, db.FilterEq("at_uri", s.config.Label.GoodFirstIssue))
298 if err != nil {
299 // non-fatal
300 }
301
302 s.pages.Timeline(w, pages.TimelineParams{
303 LoggedInUser: user,
304 Timeline: timeline,
305 Repos: repos,
306 GfiLabel: gfiLabel,
307 })
308}
309
310func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
311 user := s.oauth.GetUser(r)
312 if user == nil {
313 return
314 }
315
316 l := s.logger.With("handler", "UpgradeBanner")
317 l = l.With("did", user.Did)
318
319 regs, err := db.GetRegistrations(
320 s.db,
321 db.FilterEq("did", user.Did),
322 db.FilterEq("needs_upgrade", 1),
323 )
324 if err != nil {
325 l.Error("non-fatal: failed to get registrations", "err", err)
326 }
327
328 spindles, err := db.GetSpindles(
329 s.db,
330 db.FilterEq("owner", user.Did),
331 db.FilterEq("needs_upgrade", 1),
332 )
333 if err != nil {
334 l.Error("non-fatal: failed to get spindles", "err", err)
335 }
336
337 if regs == nil && spindles == nil {
338 return
339 }
340
341 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
342 Registrations: regs,
343 Spindles: spindles,
344 })
345}
346
347func (s *State) Home(w http.ResponseWriter, r *http.Request) {
348 // TODO: set this flag based on the UI
349 filtered := false
350
351 timeline, err := db.MakeTimeline(s.db, 5, "", filtered)
352 if err != nil {
353 s.logger.Error("failed to make timeline", "err", err)
354 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
355 return
356 }
357
358 repos, err := db.GetTopStarredReposLastWeek(s.db)
359 if err != nil {
360 s.logger.Error("failed to get top starred repos", "err", err)
361 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
362 return
363 }
364
365 s.pages.Home(w, pages.TimelineParams{
366 LoggedInUser: nil,
367 Timeline: timeline,
368 Repos: repos,
369 })
370}
371
372func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
373 user := chi.URLParam(r, "user")
374 user = strings.TrimPrefix(user, "@")
375
376 if user == "" {
377 w.WriteHeader(http.StatusBadRequest)
378 return
379 }
380
381 id, err := s.idResolver.ResolveIdent(r.Context(), user)
382 if err != nil {
383 w.WriteHeader(http.StatusInternalServerError)
384 return
385 }
386
387 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
388 if err != nil {
389 s.logger.Error("failed to get public keys", "err", err)
390 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
391 return
392 }
393
394 if len(pubKeys) == 0 {
395 w.WriteHeader(http.StatusNoContent)
396 return
397 }
398
399 for _, k := range pubKeys {
400 key := strings.TrimRight(k.Key, "\n")
401 fmt.Fprintln(w, key)
402 }
403}
404
405func validateRepoName(name string) error {
406 // check for path traversal attempts
407 if name == "." || name == ".." ||
408 strings.Contains(name, "/") || strings.Contains(name, "\\") {
409 return fmt.Errorf("Repository name contains invalid path characters")
410 }
411
412 // check for sequences that could be used for traversal when normalized
413 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
414 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
415 return fmt.Errorf("Repository name contains invalid path sequence")
416 }
417
418 // then continue with character validation
419 for _, char := range name {
420 if !((char >= 'a' && char <= 'z') ||
421 (char >= 'A' && char <= 'Z') ||
422 (char >= '0' && char <= '9') ||
423 char == '-' || char == '_' || char == '.') {
424 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
425 }
426 }
427
428 // additional check to prevent multiple sequential dots
429 if strings.Contains(name, "..") {
430 return fmt.Errorf("Repository name cannot contain sequential dots")
431 }
432
433 // if all checks pass
434 return nil
435}
436
437func stripGitExt(name string) string {
438 return strings.TrimSuffix(name, ".git")
439}
440
441func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
442 switch r.Method {
443 case http.MethodGet:
444 user := s.oauth.GetUser(r)
445 knots, err := s.enforcer.GetKnotsForUser(user.Did)
446 if err != nil {
447 s.pages.Notice(w, "repo", "Invalid user account.")
448 return
449 }
450
451 s.pages.NewRepo(w, pages.NewRepoParams{
452 LoggedInUser: user,
453 Knots: knots,
454 })
455
456 case http.MethodPost:
457 l := s.logger.With("handler", "NewRepo")
458
459 user := s.oauth.GetUser(r)
460 l = l.With("did", user.Did)
461
462 // form validation
463 domain := r.FormValue("domain")
464 if domain == "" {
465 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
466 return
467 }
468 l = l.With("knot", domain)
469
470 repoName := r.FormValue("name")
471 if repoName == "" {
472 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
473 return
474 }
475
476 if err := validateRepoName(repoName); err != nil {
477 s.pages.Notice(w, "repo", err.Error())
478 return
479 }
480 repoName = stripGitExt(repoName)
481 l = l.With("repoName", repoName)
482
483 defaultBranch := r.FormValue("branch")
484 if defaultBranch == "" {
485 defaultBranch = "main"
486 }
487 l = l.With("defaultBranch", defaultBranch)
488
489 description := r.FormValue("description")
490
491 // ACL validation
492 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
493 if err != nil || !ok {
494 l.Info("unauthorized")
495 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
496 return
497 }
498
499 // Check for existing repos
500 existingRepo, err := db.GetRepo(
501 s.db,
502 db.FilterEq("did", user.Did),
503 db.FilterEq("name", repoName),
504 )
505 if err == nil && existingRepo != nil {
506 l.Info("repo exists")
507 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
508 return
509 }
510
511 // create atproto record for this repo
512 rkey := tid.TID()
513 repo := &models.Repo{
514 Did: user.Did,
515 Name: repoName,
516 Knot: domain,
517 Rkey: rkey,
518 Description: description,
519 Created: time.Now(),
520 Labels: s.config.Label.DefaultLabelDefs,
521 }
522 record := repo.AsRecord()
523
524 atpClient, err := s.oauth.AuthorizedClient(r)
525 if err != nil {
526 l.Info("PDS write failed", "err", err)
527 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
528 return
529 }
530
531 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
532 Collection: tangled.RepoNSID,
533 Repo: user.Did,
534 Rkey: rkey,
535 Record: &lexutil.LexiconTypeDecoder{
536 Val: &record,
537 },
538 })
539 if err != nil {
540 l.Info("PDS write failed", "err", err)
541 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
542 return
543 }
544
545 aturi := atresp.Uri
546 l = l.With("aturi", aturi)
547 l.Info("wrote to PDS")
548
549 tx, err := s.db.BeginTx(r.Context(), nil)
550 if err != nil {
551 l.Info("txn failed", "err", err)
552 s.pages.Notice(w, "repo", "Failed to save repository information.")
553 return
554 }
555
556 // The rollback function reverts a few things on failure:
557 // - the pending txn
558 // - the ACLs
559 // - the atproto record created
560 rollback := func() {
561 err1 := tx.Rollback()
562 err2 := s.enforcer.E.LoadPolicy()
563 err3 := rollbackRecord(context.Background(), aturi, atpClient)
564
565 // ignore txn complete errors, this is okay
566 if errors.Is(err1, sql.ErrTxDone) {
567 err1 = nil
568 }
569
570 if errs := errors.Join(err1, err2, err3); errs != nil {
571 l.Error("failed to rollback changes", "errs", errs)
572 return
573 }
574 }
575 defer rollback()
576
577 client, err := s.oauth.ServiceClient(
578 r,
579 oauth.WithService(domain),
580 oauth.WithLxm(tangled.RepoCreateNSID),
581 oauth.WithDev(s.config.Core.Dev),
582 )
583 if err != nil {
584 l.Error("service auth failed", "err", err)
585 s.pages.Notice(w, "repo", "Failed to reach PDS.")
586 return
587 }
588
589 xe := tangled.RepoCreate(
590 r.Context(),
591 client,
592 &tangled.RepoCreate_Input{
593 Rkey: rkey,
594 },
595 )
596 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
597 l.Error("xrpc error", "xe", xe)
598 s.pages.Notice(w, "repo", err.Error())
599 return
600 }
601
602 err = db.AddRepo(tx, repo)
603 if err != nil {
604 l.Error("db write failed", "err", err)
605 s.pages.Notice(w, "repo", "Failed to save repository information.")
606 return
607 }
608
609 // acls
610 p, _ := securejoin.SecureJoin(user.Did, repoName)
611 err = s.enforcer.AddRepo(user.Did, domain, p)
612 if err != nil {
613 l.Error("acl setup failed", "err", err)
614 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
615 return
616 }
617
618 err = tx.Commit()
619 if err != nil {
620 l.Error("txn commit failed", "err", err)
621 http.Error(w, err.Error(), http.StatusInternalServerError)
622 return
623 }
624
625 err = s.enforcer.E.SavePolicy()
626 if err != nil {
627 l.Error("acl save failed", "err", err)
628 http.Error(w, err.Error(), http.StatusInternalServerError)
629 return
630 }
631
632 // reset the ATURI because the transaction completed successfully
633 aturi = ""
634
635 s.notifier.NewRepo(r.Context(), repo)
636 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Did, repoName))
637 }
638}
639
640// this is used to rollback changes made to the PDS
641//
642// it is a no-op if the provided ATURI is empty
643func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
644 if aturi == "" {
645 return nil
646 }
647
648 parsed := syntax.ATURI(aturi)
649
650 collection := parsed.Collection().String()
651 repo := parsed.Authority().String()
652 rkey := parsed.RecordKey().String()
653
654 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
655 Collection: collection,
656 Repo: repo,
657 Rkey: rkey,
658 })
659 return err
660}
661
662func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
663 defaultLabels, err := db.GetLabelDefinitions(e, db.FilterIn("at_uri", defaults))
664 if err != nil {
665 return err
666 }
667 // already present
668 if len(defaultLabels) == len(defaults) {
669 return nil
670 }
671
672 labelDefs, err := models.FetchLabelDefs(r, defaults)
673 if err != nil {
674 return err
675 }
676
677 // Insert each label definition to the database
678 for _, labelDef := range labelDefs {
679 _, err = db.AddLabelDefinition(e, &labelDef)
680 if err != nil {
681 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
682 }
683 }
684
685 return nil
686}