1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/config"
16 "tangled.org/core/appview/db"
17 "tangled.org/core/appview/models"
18 "tangled.org/core/appview/notify"
19 dbnotify "tangled.org/core/appview/notify/db"
20 phnotify "tangled.org/core/appview/notify/posthog"
21 "tangled.org/core/appview/oauth"
22 "tangled.org/core/appview/pages"
23 "tangled.org/core/appview/reporesolver"
24 "tangled.org/core/appview/validator"
25 xrpcclient "tangled.org/core/appview/xrpcclient"
26 "tangled.org/core/eventconsumer"
27 "tangled.org/core/idresolver"
28 "tangled.org/core/jetstream"
29 "tangled.org/core/log"
30 tlog "tangled.org/core/log"
31 "tangled.org/core/rbac"
32 "tangled.org/core/tid"
33
34 comatproto "github.com/bluesky-social/indigo/api/atproto"
35 atpclient "github.com/bluesky-social/indigo/atproto/client"
36 "github.com/bluesky-social/indigo/atproto/syntax"
37 lexutil "github.com/bluesky-social/indigo/lex/util"
38 securejoin "github.com/cyphar/filepath-securejoin"
39 "github.com/go-chi/chi/v5"
40 "github.com/posthog/posthog-go"
41)
42
43type State struct {
44 db *db.DB
45 notifier notify.Notifier
46 oauth *oauth.OAuth
47 enforcer *rbac.Enforcer
48 pages *pages.Pages
49 idResolver *idresolver.Resolver
50 posthog posthog.Client
51 jc *jetstream.JetstreamClient
52 config *config.Config
53 repoResolver *reporesolver.RepoResolver
54 knotstream *eventconsumer.Consumer
55 spindlestream *eventconsumer.Consumer
56 logger *slog.Logger
57 validator *validator.Validator
58}
59
60func Make(ctx context.Context, config *config.Config) (*State, error) {
61 logger := tlog.FromContext(ctx)
62
63 d, err := db.Make(ctx, config.Core.DbPath)
64 if err != nil {
65 return nil, fmt.Errorf("failed to create db: %w", err)
66 }
67
68 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
69 if err != nil {
70 return nil, fmt.Errorf("failed to create enforcer: %w", err)
71 }
72
73 res, err := idresolver.RedisResolver(config.Redis.ToURL())
74 if err != nil {
75 logger.Error("failed to create redis resolver", "err", err)
76 res = idresolver.DefaultResolver()
77 }
78
79 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
80 if err != nil {
81 return nil, fmt.Errorf("failed to create posthog client: %w", err)
82 }
83
84 pages := pages.NewPages(config, res, log.SubLogger(logger, "pages"))
85 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
86 if err != nil {
87 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
88 }
89 validator := validator.New(d, res, enforcer)
90
91 repoResolver := reporesolver.New(config, enforcer, res, d)
92
93 wrapper := db.DbWrapper{Execer: d}
94 jc, err := jetstream.NewJetstreamClient(
95 config.Jetstream.Endpoint,
96 "appview",
97 []string{
98 tangled.GraphFollowNSID,
99 tangled.FeedStarNSID,
100 tangled.PublicKeyNSID,
101 tangled.RepoArtifactNSID,
102 tangled.ActorProfileNSID,
103 tangled.SpindleMemberNSID,
104 tangled.SpindleNSID,
105 tangled.StringNSID,
106 tangled.RepoIssueNSID,
107 tangled.RepoIssueCommentNSID,
108 tangled.LabelDefinitionNSID,
109 tangled.LabelOpNSID,
110 },
111 nil,
112 tlog.SubLogger(logger, "jetstream"),
113 wrapper,
114 false,
115
116 // in-memory filter is inapplicalble to appview so
117 // we'll never log dids anyway.
118 false,
119 )
120 if err != nil {
121 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
122 }
123
124 if err := BackfillDefaultDefs(d, res); err != nil {
125 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
126 }
127
128 ingester := appview.Ingester{
129 Db: wrapper,
130 Enforcer: enforcer,
131 IdResolver: res,
132 Config: config,
133 Logger: log.SubLogger(logger, "ingester"),
134 Validator: validator,
135 }
136 err = jc.StartJetstream(ctx, ingester.Ingest())
137 if err != nil {
138 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
139 }
140
141 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
142 if err != nil {
143 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
144 }
145 knotstream.Start(ctx)
146
147 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
148 if err != nil {
149 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
150 }
151 spindlestream.Start(ctx)
152
153 var notifiers []notify.Notifier
154
155 // Always add the database notifier
156 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
157
158 // Add other notifiers in production only
159 if !config.Core.Dev {
160 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
161 }
162 notifier := notify.NewMergedNotifier(notifiers...)
163
164 state := &State{
165 d,
166 notifier,
167 oauth,
168 enforcer,
169 pages,
170 res,
171 posthog,
172 jc,
173 config,
174 repoResolver,
175 knotstream,
176 spindlestream,
177 logger,
178 validator,
179 }
180
181 return state, nil
182}
183
184func (s *State) Close() error {
185 // other close up logic goes here
186 return s.db.Close()
187}
188
189func (s *State) Favicon(w http.ResponseWriter, r *http.Request) {
190 w.Header().Set("Content-Type", "image/svg+xml")
191 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year
192 w.Header().Set("ETag", `"favicon-svg-v1"`)
193
194 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` {
195 w.WriteHeader(http.StatusNotModified)
196 return
197 }
198
199 s.pages.Favicon(w)
200}
201
202func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
203 w.Header().Set("Content-Type", "text/plain")
204 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
205
206 robotsTxt := `User-agent: *
207Allow: /
208`
209 w.Write([]byte(robotsTxt))
210}
211
212// https://developer.mozilla.org/en-US/docs/Web/Progressive_web_apps/Manifest
213const manifestJson = `{
214 "name": "tangled",
215 "description": "tightly-knit social coding.",
216 "icons": [
217 {
218 "src": "/favicon.svg",
219 "sizes": "144x144"
220 }
221 ],
222 "start_url": "/",
223 "id": "org.tangled",
224
225 "display": "standalone",
226 "background_color": "#111827",
227 "theme_color": "#111827"
228}`
229
230func (p *State) PWAManifest(w http.ResponseWriter, r *http.Request) {
231 w.Header().Set("Content-Type", "application/json")
232 w.Write([]byte(manifestJson))
233}
234
235func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
236 user := s.oauth.GetUser(r)
237 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
238 LoggedInUser: user,
239 })
240}
241
242func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
243 user := s.oauth.GetUser(r)
244 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
245 LoggedInUser: user,
246 })
247}
248
249func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
250 user := s.oauth.GetUser(r)
251 s.pages.Brand(w, pages.BrandParams{
252 LoggedInUser: user,
253 })
254}
255
256func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) {
257 if s.oauth.GetUser(r) != nil {
258 s.Timeline(w, r)
259 return
260 }
261 s.Home(w, r)
262}
263
264func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
265 user := s.oauth.GetUser(r)
266
267 // TODO: set this flag based on the UI
268 filtered := false
269
270 var userDid string
271 if user != nil {
272 userDid = user.Did
273 }
274 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered)
275 if err != nil {
276 s.logger.Error("failed to make timeline", "err", err)
277 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
278 }
279
280 repos, err := db.GetTopStarredReposLastWeek(s.db)
281 if err != nil {
282 s.logger.Error("failed to get top starred repos", "err", err)
283 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
284 return
285 }
286
287 gfiLabel, err := db.GetLabelDefinition(s.db, db.FilterEq("at_uri", models.LabelGoodFirstIssue))
288 if err != nil {
289 // non-fatal
290 }
291
292 s.pages.Timeline(w, pages.TimelineParams{
293 LoggedInUser: user,
294 Timeline: timeline,
295 Repos: repos,
296 GfiLabel: gfiLabel,
297 })
298}
299
300func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
301 user := s.oauth.GetUser(r)
302 if user == nil {
303 return
304 }
305
306 l := s.logger.With("handler", "UpgradeBanner")
307 l = l.With("did", user.Did)
308
309 regs, err := db.GetRegistrations(
310 s.db,
311 db.FilterEq("did", user.Did),
312 db.FilterEq("needs_upgrade", 1),
313 )
314 if err != nil {
315 l.Error("non-fatal: failed to get registrations", "err", err)
316 }
317
318 spindles, err := db.GetSpindles(
319 s.db,
320 db.FilterEq("owner", user.Did),
321 db.FilterEq("needs_upgrade", 1),
322 )
323 if err != nil {
324 l.Error("non-fatal: failed to get spindles", "err", err)
325 }
326
327 if regs == nil && spindles == nil {
328 return
329 }
330
331 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
332 Registrations: regs,
333 Spindles: spindles,
334 })
335}
336
337func (s *State) Home(w http.ResponseWriter, r *http.Request) {
338 // TODO: set this flag based on the UI
339 filtered := false
340
341 timeline, err := db.MakeTimeline(s.db, 5, "", filtered)
342 if err != nil {
343 s.logger.Error("failed to make timeline", "err", err)
344 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
345 return
346 }
347
348 repos, err := db.GetTopStarredReposLastWeek(s.db)
349 if err != nil {
350 s.logger.Error("failed to get top starred repos", "err", err)
351 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
352 return
353 }
354
355 s.pages.Home(w, pages.TimelineParams{
356 LoggedInUser: nil,
357 Timeline: timeline,
358 Repos: repos,
359 })
360}
361
362func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
363 user := chi.URLParam(r, "user")
364 user = strings.TrimPrefix(user, "@")
365
366 if user == "" {
367 w.WriteHeader(http.StatusBadRequest)
368 return
369 }
370
371 id, err := s.idResolver.ResolveIdent(r.Context(), user)
372 if err != nil {
373 w.WriteHeader(http.StatusInternalServerError)
374 return
375 }
376
377 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
378 if err != nil {
379 w.WriteHeader(http.StatusNotFound)
380 return
381 }
382
383 if len(pubKeys) == 0 {
384 w.WriteHeader(http.StatusNotFound)
385 return
386 }
387
388 for _, k := range pubKeys {
389 key := strings.TrimRight(k.Key, "\n")
390 fmt.Fprintln(w, key)
391 }
392}
393
394func validateRepoName(name string) error {
395 // check for path traversal attempts
396 if name == "." || name == ".." ||
397 strings.Contains(name, "/") || strings.Contains(name, "\\") {
398 return fmt.Errorf("Repository name contains invalid path characters")
399 }
400
401 // check for sequences that could be used for traversal when normalized
402 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
403 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
404 return fmt.Errorf("Repository name contains invalid path sequence")
405 }
406
407 // then continue with character validation
408 for _, char := range name {
409 if !((char >= 'a' && char <= 'z') ||
410 (char >= 'A' && char <= 'Z') ||
411 (char >= '0' && char <= '9') ||
412 char == '-' || char == '_' || char == '.') {
413 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
414 }
415 }
416
417 // additional check to prevent multiple sequential dots
418 if strings.Contains(name, "..") {
419 return fmt.Errorf("Repository name cannot contain sequential dots")
420 }
421
422 // if all checks pass
423 return nil
424}
425
426func stripGitExt(name string) string {
427 return strings.TrimSuffix(name, ".git")
428}
429
430func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
431 switch r.Method {
432 case http.MethodGet:
433 user := s.oauth.GetUser(r)
434 knots, err := s.enforcer.GetKnotsForUser(user.Did)
435 if err != nil {
436 s.pages.Notice(w, "repo", "Invalid user account.")
437 return
438 }
439
440 s.pages.NewRepo(w, pages.NewRepoParams{
441 LoggedInUser: user,
442 Knots: knots,
443 })
444
445 case http.MethodPost:
446 l := s.logger.With("handler", "NewRepo")
447
448 user := s.oauth.GetUser(r)
449 l = l.With("did", user.Did)
450
451 // form validation
452 domain := r.FormValue("domain")
453 if domain == "" {
454 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
455 return
456 }
457 l = l.With("knot", domain)
458
459 repoName := r.FormValue("name")
460 if repoName == "" {
461 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
462 return
463 }
464
465 if err := validateRepoName(repoName); err != nil {
466 s.pages.Notice(w, "repo", err.Error())
467 return
468 }
469 repoName = stripGitExt(repoName)
470 l = l.With("repoName", repoName)
471
472 defaultBranch := r.FormValue("branch")
473 if defaultBranch == "" {
474 defaultBranch = "main"
475 }
476 l = l.With("defaultBranch", defaultBranch)
477
478 description := r.FormValue("description")
479
480 // ACL validation
481 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
482 if err != nil || !ok {
483 l.Info("unauthorized")
484 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
485 return
486 }
487
488 // Check for existing repos
489 existingRepo, err := db.GetRepo(
490 s.db,
491 db.FilterEq("did", user.Did),
492 db.FilterEq("name", repoName),
493 )
494 if err == nil && existingRepo != nil {
495 l.Info("repo exists")
496 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
497 return
498 }
499
500 // create atproto record for this repo
501 rkey := tid.TID()
502 repo := &models.Repo{
503 Did: user.Did,
504 Name: repoName,
505 Knot: domain,
506 Rkey: rkey,
507 Description: description,
508 Created: time.Now(),
509 Labels: models.DefaultLabelDefs(),
510 }
511 record := repo.AsRecord()
512
513 atpClient, err := s.oauth.AuthorizedClient(r)
514 if err != nil {
515 l.Info("PDS write failed", "err", err)
516 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
517 return
518 }
519
520 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
521 Collection: tangled.RepoNSID,
522 Repo: user.Did,
523 Rkey: rkey,
524 Record: &lexutil.LexiconTypeDecoder{
525 Val: &record,
526 },
527 })
528 if err != nil {
529 l.Info("PDS write failed", "err", err)
530 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
531 return
532 }
533
534 aturi := atresp.Uri
535 l = l.With("aturi", aturi)
536 l.Info("wrote to PDS")
537
538 tx, err := s.db.BeginTx(r.Context(), nil)
539 if err != nil {
540 l.Info("txn failed", "err", err)
541 s.pages.Notice(w, "repo", "Failed to save repository information.")
542 return
543 }
544
545 // The rollback function reverts a few things on failure:
546 // - the pending txn
547 // - the ACLs
548 // - the atproto record created
549 rollback := func() {
550 err1 := tx.Rollback()
551 err2 := s.enforcer.E.LoadPolicy()
552 err3 := rollbackRecord(context.Background(), aturi, atpClient)
553
554 // ignore txn complete errors, this is okay
555 if errors.Is(err1, sql.ErrTxDone) {
556 err1 = nil
557 }
558
559 if errs := errors.Join(err1, err2, err3); errs != nil {
560 l.Error("failed to rollback changes", "errs", errs)
561 return
562 }
563 }
564 defer rollback()
565
566 client, err := s.oauth.ServiceClient(
567 r,
568 oauth.WithService(domain),
569 oauth.WithLxm(tangled.RepoCreateNSID),
570 oauth.WithDev(s.config.Core.Dev),
571 )
572 if err != nil {
573 l.Error("service auth failed", "err", err)
574 s.pages.Notice(w, "repo", "Failed to reach PDS.")
575 return
576 }
577
578 xe := tangled.RepoCreate(
579 r.Context(),
580 client,
581 &tangled.RepoCreate_Input{
582 Rkey: rkey,
583 },
584 )
585 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
586 l.Error("xrpc error", "xe", xe)
587 s.pages.Notice(w, "repo", err.Error())
588 return
589 }
590
591 err = db.AddRepo(tx, repo)
592 if err != nil {
593 l.Error("db write failed", "err", err)
594 s.pages.Notice(w, "repo", "Failed to save repository information.")
595 return
596 }
597
598 // acls
599 p, _ := securejoin.SecureJoin(user.Did, repoName)
600 err = s.enforcer.AddRepo(user.Did, domain, p)
601 if err != nil {
602 l.Error("acl setup failed", "err", err)
603 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
604 return
605 }
606
607 err = tx.Commit()
608 if err != nil {
609 l.Error("txn commit failed", "err", err)
610 http.Error(w, err.Error(), http.StatusInternalServerError)
611 return
612 }
613
614 err = s.enforcer.E.SavePolicy()
615 if err != nil {
616 l.Error("acl save failed", "err", err)
617 http.Error(w, err.Error(), http.StatusInternalServerError)
618 return
619 }
620
621 // reset the ATURI because the transaction completed successfully
622 aturi = ""
623
624 s.notifier.NewRepo(r.Context(), repo)
625 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Did, repoName))
626 }
627}
628
629// this is used to rollback changes made to the PDS
630//
631// it is a no-op if the provided ATURI is empty
632func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
633 if aturi == "" {
634 return nil
635 }
636
637 parsed := syntax.ATURI(aturi)
638
639 collection := parsed.Collection().String()
640 repo := parsed.Authority().String()
641 rkey := parsed.RecordKey().String()
642
643 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
644 Collection: collection,
645 Repo: repo,
646 Rkey: rkey,
647 })
648 return err
649}
650
651func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver) error {
652 defaults := models.DefaultLabelDefs()
653 defaultLabels, err := db.GetLabelDefinitions(e, db.FilterIn("at_uri", defaults))
654 if err != nil {
655 return err
656 }
657 // already present
658 if len(defaultLabels) == len(defaults) {
659 return nil
660 }
661
662 labelDefs, err := models.FetchDefaultDefs(r)
663 if err != nil {
664 return err
665 }
666
667 // Insert each label definition to the database
668 for _, labelDef := range labelDefs {
669 _, err = db.AddLabelDefinition(e, &labelDef)
670 if err != nil {
671 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
672 }
673 }
674
675 return nil
676}