1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log"
9 "log/slog"
10 "net/http"
11 "strings"
12 "time"
13
14 comatproto "github.com/bluesky-social/indigo/api/atproto"
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 lexutil "github.com/bluesky-social/indigo/lex/util"
17 securejoin "github.com/cyphar/filepath-securejoin"
18 "github.com/go-chi/chi/v5"
19 "github.com/posthog/posthog-go"
20 "tangled.org/core/api/tangled"
21 "tangled.org/core/appview"
22 "tangled.org/core/appview/cache"
23 "tangled.org/core/appview/cache/session"
24 "tangled.org/core/appview/config"
25 "tangled.org/core/appview/db"
26 "tangled.org/core/appview/models"
27 "tangled.org/core/appview/notify"
28 dbnotify "tangled.org/core/appview/notify/db"
29 phnotify "tangled.org/core/appview/notify/posthog"
30 "tangled.org/core/appview/oauth"
31 "tangled.org/core/appview/pages"
32 "tangled.org/core/appview/reporesolver"
33 "tangled.org/core/appview/validator"
34 xrpcclient "tangled.org/core/appview/xrpcclient"
35 "tangled.org/core/eventconsumer"
36 "tangled.org/core/idresolver"
37 "tangled.org/core/jetstream"
38 tlog "tangled.org/core/log"
39 "tangled.org/core/rbac"
40 "tangled.org/core/tid"
41)
42
43type State struct {
44 db *db.DB
45 notifier notify.Notifier
46 oauth *oauth.OAuth
47 enforcer *rbac.Enforcer
48 pages *pages.Pages
49 sess *session.SessionStore
50 idResolver *idresolver.Resolver
51 posthog posthog.Client
52 jc *jetstream.JetstreamClient
53 config *config.Config
54 repoResolver *reporesolver.RepoResolver
55 knotstream *eventconsumer.Consumer
56 spindlestream *eventconsumer.Consumer
57 logger *slog.Logger
58 validator *validator.Validator
59}
60
61func Make(ctx context.Context, config *config.Config) (*State, error) {
62 d, err := db.Make(config.Core.DbPath)
63 if err != nil {
64 return nil, fmt.Errorf("failed to create db: %w", err)
65 }
66
67 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
68 if err != nil {
69 return nil, fmt.Errorf("failed to create enforcer: %w", err)
70 }
71
72 res, err := idresolver.RedisResolver(config.Redis.ToURL())
73 if err != nil {
74 log.Printf("failed to create redis resolver: %v", err)
75 res = idresolver.DefaultResolver()
76 }
77
78 pgs := pages.NewPages(config, res)
79 cache := cache.New(config.Redis.Addr)
80 sess := session.New(cache)
81 oauth := oauth.NewOAuth(config, sess)
82 validator := validator.New(d, res, enforcer)
83
84 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
85 if err != nil {
86 return nil, fmt.Errorf("failed to create posthog client: %w", err)
87 }
88
89 repoResolver := reporesolver.New(config, enforcer, res, d)
90
91 wrapper := db.DbWrapper{Execer: d}
92 jc, err := jetstream.NewJetstreamClient(
93 config.Jetstream.Endpoint,
94 "appview",
95 []string{
96 tangled.GraphFollowNSID,
97 tangled.FeedStarNSID,
98 tangled.PublicKeyNSID,
99 tangled.RepoArtifactNSID,
100 tangled.ActorProfileNSID,
101 tangled.SpindleMemberNSID,
102 tangled.SpindleNSID,
103 tangled.StringNSID,
104 tangled.RepoIssueNSID,
105 tangled.RepoIssueCommentNSID,
106 tangled.LabelDefinitionNSID,
107 tangled.LabelOpNSID,
108 },
109 nil,
110 slog.Default(),
111 wrapper,
112 false,
113
114 // in-memory filter is inapplicalble to appview so
115 // we'll never log dids anyway.
116 false,
117 )
118 if err != nil {
119 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
120 }
121
122 if err := BackfillDefaultDefs(d, res); err != nil {
123 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
124 }
125
126 ingester := appview.Ingester{
127 Db: wrapper,
128 Enforcer: enforcer,
129 IdResolver: res,
130 Config: config,
131 Logger: tlog.New("ingester"),
132 Validator: validator,
133 }
134 err = jc.StartJetstream(ctx, ingester.Ingest())
135 if err != nil {
136 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
137 }
138
139 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
140 if err != nil {
141 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
142 }
143 knotstream.Start(ctx)
144
145 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
146 if err != nil {
147 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
148 }
149 spindlestream.Start(ctx)
150
151 var notifiers []notify.Notifier
152
153 // Always add the database notifier
154 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
155
156 // Add other notifiers in production only
157 if !config.Core.Dev {
158 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
159 }
160 notifier := notify.NewMergedNotifier(notifiers...)
161
162 state := &State{
163 d,
164 notifier,
165 oauth,
166 enforcer,
167 pgs,
168 sess,
169 res,
170 posthog,
171 jc,
172 config,
173 repoResolver,
174 knotstream,
175 spindlestream,
176 slog.Default(),
177 validator,
178 }
179
180 return state, nil
181}
182
183func (s *State) Close() error {
184 // other close up logic goes here
185 return s.db.Close()
186}
187
188func (s *State) Favicon(w http.ResponseWriter, r *http.Request) {
189 w.Header().Set("Content-Type", "image/svg+xml")
190 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year
191 w.Header().Set("ETag", `"favicon-svg-v1"`)
192
193 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` {
194 w.WriteHeader(http.StatusNotModified)
195 return
196 }
197
198 s.pages.Favicon(w)
199}
200
201// https://developer.mozilla.org/en-US/docs/Web/Progressive_web_apps/Manifest
202const manifestJson = `{
203 "name": "tangled",
204 "description": "tightly-knit social coding.",
205 "icons": [
206 {
207 "src": "/favicon.svg",
208 "sizes": "144x144"
209 }
210 ],
211 "start_url": "/",
212 "id": "org.tangled",
213
214 "display": "standalone",
215 "background_color": "#111827",
216 "theme_color": "#111827"
217}`
218
219func (p *State) PWAManifest(w http.ResponseWriter, r *http.Request) {
220 w.Header().Set("Content-Type", "application/json")
221 w.Write([]byte(manifestJson))
222}
223
224func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
225 user := s.oauth.GetUser(r)
226 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
227 LoggedInUser: user,
228 })
229}
230
231func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
232 user := s.oauth.GetUser(r)
233 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
234 LoggedInUser: user,
235 })
236}
237
238func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
239 user := s.oauth.GetUser(r)
240 s.pages.Brand(w, pages.BrandParams{
241 LoggedInUser: user,
242 })
243}
244
245func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) {
246 if s.oauth.GetUser(r) != nil {
247 s.Timeline(w, r)
248 return
249 }
250 s.Home(w, r)
251}
252
253func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
254 user := s.oauth.GetUser(r)
255
256 var userDid string
257 if user != nil {
258 userDid = user.Did
259 }
260 timeline, err := db.MakeTimeline(s.db, 50, userDid)
261 if err != nil {
262 log.Println(err)
263 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
264 }
265
266 repos, err := db.GetTopStarredReposLastWeek(s.db)
267 if err != nil {
268 log.Println(err)
269 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
270 return
271 }
272
273 s.pages.Timeline(w, pages.TimelineParams{
274 LoggedInUser: user,
275 Timeline: timeline,
276 Repos: repos,
277 })
278}
279
280func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
281 user := s.oauth.GetUser(r)
282 if user == nil {
283 return
284 }
285
286 l := s.logger.With("handler", "UpgradeBanner")
287 l = l.With("did", user.Did)
288 l = l.With("handle", user.Handle)
289
290 regs, err := db.GetRegistrations(
291 s.db,
292 db.FilterEq("did", user.Did),
293 db.FilterEq("needs_upgrade", 1),
294 )
295 if err != nil {
296 l.Error("non-fatal: failed to get registrations", "err", err)
297 }
298
299 spindles, err := db.GetSpindles(
300 s.db,
301 db.FilterEq("owner", user.Did),
302 db.FilterEq("needs_upgrade", 1),
303 )
304 if err != nil {
305 l.Error("non-fatal: failed to get spindles", "err", err)
306 }
307
308 if regs == nil && spindles == nil {
309 return
310 }
311
312 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
313 Registrations: regs,
314 Spindles: spindles,
315 })
316}
317
318func (s *State) Home(w http.ResponseWriter, r *http.Request) {
319 timeline, err := db.MakeTimeline(s.db, 5, "")
320 if err != nil {
321 log.Println(err)
322 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
323 return
324 }
325
326 repos, err := db.GetTopStarredReposLastWeek(s.db)
327 if err != nil {
328 log.Println(err)
329 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
330 return
331 }
332
333 s.pages.Home(w, pages.TimelineParams{
334 LoggedInUser: nil,
335 Timeline: timeline,
336 Repos: repos,
337 })
338}
339
340func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
341 user := chi.URLParam(r, "user")
342 user = strings.TrimPrefix(user, "@")
343
344 if user == "" {
345 w.WriteHeader(http.StatusBadRequest)
346 return
347 }
348
349 id, err := s.idResolver.ResolveIdent(r.Context(), user)
350 if err != nil {
351 w.WriteHeader(http.StatusInternalServerError)
352 return
353 }
354
355 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
356 if err != nil {
357 w.WriteHeader(http.StatusNotFound)
358 return
359 }
360
361 if len(pubKeys) == 0 {
362 w.WriteHeader(http.StatusNotFound)
363 return
364 }
365
366 for _, k := range pubKeys {
367 key := strings.TrimRight(k.Key, "\n")
368 fmt.Fprintln(w, key)
369 }
370}
371
372func validateRepoName(name string) error {
373 // check for path traversal attempts
374 if name == "." || name == ".." ||
375 strings.Contains(name, "/") || strings.Contains(name, "\\") {
376 return fmt.Errorf("Repository name contains invalid path characters")
377 }
378
379 // check for sequences that could be used for traversal when normalized
380 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
381 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
382 return fmt.Errorf("Repository name contains invalid path sequence")
383 }
384
385 // then continue with character validation
386 for _, char := range name {
387 if !((char >= 'a' && char <= 'z') ||
388 (char >= 'A' && char <= 'Z') ||
389 (char >= '0' && char <= '9') ||
390 char == '-' || char == '_' || char == '.') {
391 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
392 }
393 }
394
395 // additional check to prevent multiple sequential dots
396 if strings.Contains(name, "..") {
397 return fmt.Errorf("Repository name cannot contain sequential dots")
398 }
399
400 // if all checks pass
401 return nil
402}
403
404func stripGitExt(name string) string {
405 return strings.TrimSuffix(name, ".git")
406}
407
408func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
409 switch r.Method {
410 case http.MethodGet:
411 user := s.oauth.GetUser(r)
412 knots, err := s.enforcer.GetKnotsForUser(user.Did)
413 if err != nil {
414 s.pages.Notice(w, "repo", "Invalid user account.")
415 return
416 }
417
418 s.pages.NewRepo(w, pages.NewRepoParams{
419 LoggedInUser: user,
420 Knots: knots,
421 })
422
423 case http.MethodPost:
424 l := s.logger.With("handler", "NewRepo")
425
426 user := s.oauth.GetUser(r)
427 l = l.With("did", user.Did)
428 l = l.With("handle", user.Handle)
429
430 // form validation
431 domain := r.FormValue("domain")
432 if domain == "" {
433 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
434 return
435 }
436 l = l.With("knot", domain)
437
438 repoName := r.FormValue("name")
439 if repoName == "" {
440 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
441 return
442 }
443
444 if err := validateRepoName(repoName); err != nil {
445 s.pages.Notice(w, "repo", err.Error())
446 return
447 }
448 repoName = stripGitExt(repoName)
449 l = l.With("repoName", repoName)
450
451 defaultBranch := r.FormValue("branch")
452 if defaultBranch == "" {
453 defaultBranch = "main"
454 }
455 l = l.With("defaultBranch", defaultBranch)
456
457 description := r.FormValue("description")
458
459 // ACL validation
460 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
461 if err != nil || !ok {
462 l.Info("unauthorized")
463 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
464 return
465 }
466
467 // Check for existing repos
468 existingRepo, err := db.GetRepo(
469 s.db,
470 db.FilterEq("did", user.Did),
471 db.FilterEq("name", repoName),
472 )
473 if err == nil && existingRepo != nil {
474 l.Info("repo exists")
475 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
476 return
477 }
478
479 // create atproto record for this repo
480 rkey := tid.TID()
481 repo := &models.Repo{
482 Did: user.Did,
483 Name: repoName,
484 Knot: domain,
485 Rkey: rkey,
486 Description: description,
487 Created: time.Now(),
488 Labels: models.DefaultLabelDefs(),
489 }
490 record := repo.AsRecord()
491
492 xrpcClient, err := s.oauth.AuthorizedClient(r)
493 if err != nil {
494 l.Info("PDS write failed", "err", err)
495 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
496 return
497 }
498
499 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{
500 Collection: tangled.RepoNSID,
501 Repo: user.Did,
502 Rkey: rkey,
503 Record: &lexutil.LexiconTypeDecoder{
504 Val: &record,
505 },
506 })
507 if err != nil {
508 l.Info("PDS write failed", "err", err)
509 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
510 return
511 }
512
513 aturi := atresp.Uri
514 l = l.With("aturi", aturi)
515 l.Info("wrote to PDS")
516
517 tx, err := s.db.BeginTx(r.Context(), nil)
518 if err != nil {
519 l.Info("txn failed", "err", err)
520 s.pages.Notice(w, "repo", "Failed to save repository information.")
521 return
522 }
523
524 // The rollback function reverts a few things on failure:
525 // - the pending txn
526 // - the ACLs
527 // - the atproto record created
528 rollback := func() {
529 err1 := tx.Rollback()
530 err2 := s.enforcer.E.LoadPolicy()
531 err3 := rollbackRecord(context.Background(), aturi, xrpcClient)
532
533 // ignore txn complete errors, this is okay
534 if errors.Is(err1, sql.ErrTxDone) {
535 err1 = nil
536 }
537
538 if errs := errors.Join(err1, err2, err3); errs != nil {
539 l.Error("failed to rollback changes", "errs", errs)
540 return
541 }
542 }
543 defer rollback()
544
545 client, err := s.oauth.ServiceClient(
546 r,
547 oauth.WithService(domain),
548 oauth.WithLxm(tangled.RepoCreateNSID),
549 oauth.WithDev(s.config.Core.Dev),
550 )
551 if err != nil {
552 l.Error("service auth failed", "err", err)
553 s.pages.Notice(w, "repo", "Failed to reach PDS.")
554 return
555 }
556
557 xe := tangled.RepoCreate(
558 r.Context(),
559 client,
560 &tangled.RepoCreate_Input{
561 Rkey: rkey,
562 },
563 )
564 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
565 l.Error("xrpc error", "xe", xe)
566 s.pages.Notice(w, "repo", err.Error())
567 return
568 }
569
570 err = db.AddRepo(tx, repo)
571 if err != nil {
572 l.Error("db write failed", "err", err)
573 s.pages.Notice(w, "repo", "Failed to save repository information.")
574 return
575 }
576
577 // acls
578 p, _ := securejoin.SecureJoin(user.Did, repoName)
579 err = s.enforcer.AddRepo(user.Did, domain, p)
580 if err != nil {
581 l.Error("acl setup failed", "err", err)
582 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
583 return
584 }
585
586 err = tx.Commit()
587 if err != nil {
588 l.Error("txn commit failed", "err", err)
589 http.Error(w, err.Error(), http.StatusInternalServerError)
590 return
591 }
592
593 err = s.enforcer.E.SavePolicy()
594 if err != nil {
595 l.Error("acl save failed", "err", err)
596 http.Error(w, err.Error(), http.StatusInternalServerError)
597 return
598 }
599
600 // reset the ATURI because the transaction completed successfully
601 aturi = ""
602
603 s.notifier.NewRepo(r.Context(), repo)
604 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName))
605 }
606}
607
608// this is used to rollback changes made to the PDS
609//
610// it is a no-op if the provided ATURI is empty
611func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error {
612 if aturi == "" {
613 return nil
614 }
615
616 parsed := syntax.ATURI(aturi)
617
618 collection := parsed.Collection().String()
619 repo := parsed.Authority().String()
620 rkey := parsed.RecordKey().String()
621
622 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{
623 Collection: collection,
624 Repo: repo,
625 Rkey: rkey,
626 })
627 return err
628}
629
630func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver) error {
631 defaults := models.DefaultLabelDefs()
632 defaultLabels, err := db.GetLabelDefinitions(e, db.FilterIn("at_uri", defaults))
633 if err != nil {
634 return err
635 }
636 // already present
637 if len(defaultLabels) == len(defaults) {
638 return nil
639 }
640
641 labelDefs, err := models.FetchDefaultDefs(r)
642 if err != nil {
643 return err
644 }
645
646 // Insert each label definition to the database
647 for _, labelDef := range labelDefs {
648 _, err = db.AddLabelDefinition(e, &labelDef)
649 if err != nil {
650 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
651 }
652 }
653
654 return nil
655}