forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state
2
3import (
4 "context"
5 "crypto/hmac"
6 "crypto/sha256"
7 "encoding/hex"
8 "fmt"
9 "log"
10 "log/slog"
11 "net/http"
12 "strings"
13 "time"
14
15 comatproto "github.com/bluesky-social/indigo/api/atproto"
16 "github.com/bluesky-social/indigo/atproto/syntax"
17 lexutil "github.com/bluesky-social/indigo/lex/util"
18 securejoin "github.com/cyphar/filepath-securejoin"
19 "github.com/go-chi/chi/v5"
20 "github.com/posthog/posthog-go"
21 "tangled.sh/tangled.sh/core/api/tangled"
22 "tangled.sh/tangled.sh/core/appview"
23 "tangled.sh/tangled.sh/core/appview/cache"
24 "tangled.sh/tangled.sh/core/appview/cache/session"
25 "tangled.sh/tangled.sh/core/appview/config"
26 "tangled.sh/tangled.sh/core/appview/db"
27 "tangled.sh/tangled.sh/core/appview/idresolver"
28 "tangled.sh/tangled.sh/core/appview/oauth"
29 "tangled.sh/tangled.sh/core/appview/pages"
30 "tangled.sh/tangled.sh/core/appview/reporesolver"
31 "tangled.sh/tangled.sh/core/eventconsumer"
32 "tangled.sh/tangled.sh/core/jetstream"
33 "tangled.sh/tangled.sh/core/knotclient"
34 "tangled.sh/tangled.sh/core/rbac"
35)
36
37type State struct {
38 db *db.DB
39 oauth *oauth.OAuth
40 enforcer *rbac.Enforcer
41 tidClock syntax.TIDClock
42 pages *pages.Pages
43 sess *session.SessionStore
44 idResolver *idresolver.Resolver
45 posthog posthog.Client
46 jc *jetstream.JetstreamClient
47 config *config.Config
48 repoResolver *reporesolver.RepoResolver
49 knotstream *eventconsumer.Consumer
50 spindlestream *eventconsumer.Consumer
51}
52
53func Make(ctx context.Context, config *config.Config) (*State, error) {
54 d, err := db.Make(config.Core.DbPath)
55 if err != nil {
56 return nil, err
57 }
58
59 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
60 if err != nil {
61 return nil, err
62 }
63
64 clock := syntax.NewTIDClock(0)
65
66 pgs := pages.NewPages(config)
67
68 res, err := idresolver.RedisResolver(config.Redis)
69 if err != nil {
70 log.Printf("failed to create redis resolver: %v", err)
71 res = idresolver.DefaultResolver()
72 }
73
74 cache := cache.New(config.Redis.Addr)
75 sess := session.New(cache)
76
77 oauth := oauth.NewOAuth(config, sess)
78
79 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
80 if err != nil {
81 return nil, fmt.Errorf("failed to create posthog client: %w", err)
82 }
83
84 repoResolver := reporesolver.New(config, enforcer, res, d)
85
86 wrapper := db.DbWrapper{d}
87 jc, err := jetstream.NewJetstreamClient(
88 config.Jetstream.Endpoint,
89 "appview",
90 []string{
91 tangled.GraphFollowNSID,
92 tangled.FeedStarNSID,
93 tangled.PublicKeyNSID,
94 tangled.RepoArtifactNSID,
95 tangled.ActorProfileNSID,
96 },
97 nil,
98 slog.Default(),
99 wrapper,
100 false,
101
102 // in-memory filter is inapplicalble to appview so
103 // we'll never log dids anyway.
104 false,
105 )
106 if err != nil {
107 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
108 }
109 err = jc.StartJetstream(ctx, appview.Ingest(wrapper, enforcer))
110 if err != nil {
111 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
112 }
113
114 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
115 if err != nil {
116 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
117 }
118 knotstream.Start(ctx)
119
120 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
121 if err != nil {
122 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
123 }
124 spindlestream.Start(ctx)
125
126 state := &State{
127 d,
128 oauth,
129 enforcer,
130 clock,
131 pgs,
132 sess,
133 res,
134 posthog,
135 jc,
136 config,
137 repoResolver,
138 knotstream,
139 spindlestream,
140 }
141
142 return state, nil
143}
144
145func TID(c *syntax.TIDClock) string {
146 return c.Next().String()
147}
148
149func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
150 user := s.oauth.GetUser(r)
151
152 timeline, err := db.MakeTimeline(s.db)
153 if err != nil {
154 log.Println(err)
155 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
156 }
157
158 var didsToResolve []string
159 for _, ev := range timeline {
160 if ev.Repo != nil {
161 didsToResolve = append(didsToResolve, ev.Repo.Did)
162 if ev.Source != nil {
163 didsToResolve = append(didsToResolve, ev.Source.Did)
164 }
165 }
166 if ev.Follow != nil {
167 didsToResolve = append(didsToResolve, ev.Follow.UserDid, ev.Follow.SubjectDid)
168 }
169 if ev.Star != nil {
170 didsToResolve = append(didsToResolve, ev.Star.StarredByDid, ev.Star.Repo.Did)
171 }
172 }
173
174 resolvedIds := s.idResolver.ResolveIdents(r.Context(), didsToResolve)
175 didHandleMap := make(map[string]string)
176 for _, identity := range resolvedIds {
177 if !identity.Handle.IsInvalidHandle() {
178 didHandleMap[identity.DID.String()] = fmt.Sprintf("@%s", identity.Handle.String())
179 } else {
180 didHandleMap[identity.DID.String()] = identity.DID.String()
181 }
182 }
183
184 s.pages.Timeline(w, pages.TimelineParams{
185 LoggedInUser: user,
186 Timeline: timeline,
187 DidHandleMap: didHandleMap,
188 })
189
190 return
191}
192
193// requires auth
194func (s *State) RegistrationKey(w http.ResponseWriter, r *http.Request) {
195 switch r.Method {
196 case http.MethodGet:
197 // list open registrations under this did
198
199 return
200 case http.MethodPost:
201 session, err := s.oauth.Stores().Get(r, oauth.SessionName)
202 if err != nil || session.IsNew {
203 log.Println("unauthorized attempt to generate registration key")
204 http.Error(w, "Forbidden", http.StatusUnauthorized)
205 return
206 }
207
208 did := session.Values[oauth.SessionDid].(string)
209
210 // check if domain is valid url, and strip extra bits down to just host
211 domain := r.FormValue("domain")
212 if domain == "" {
213 http.Error(w, "Invalid form", http.StatusBadRequest)
214 return
215 }
216
217 key, err := db.GenerateRegistrationKey(s.db, domain, did)
218
219 if err != nil {
220 log.Println(err)
221 http.Error(w, "unable to register this domain", http.StatusNotAcceptable)
222 return
223 }
224
225 w.Write([]byte(key))
226 }
227}
228
229func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
230 user := chi.URLParam(r, "user")
231 user = strings.TrimPrefix(user, "@")
232
233 if user == "" {
234 w.WriteHeader(http.StatusBadRequest)
235 return
236 }
237
238 id, err := s.idResolver.ResolveIdent(r.Context(), user)
239 if err != nil {
240 w.WriteHeader(http.StatusInternalServerError)
241 return
242 }
243
244 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
245 if err != nil {
246 w.WriteHeader(http.StatusNotFound)
247 return
248 }
249
250 if len(pubKeys) == 0 {
251 w.WriteHeader(http.StatusNotFound)
252 return
253 }
254
255 for _, k := range pubKeys {
256 key := strings.TrimRight(k.Key, "\n")
257 w.Write([]byte(fmt.Sprintln(key)))
258 }
259}
260
261// create a signed request and check if a node responds to that
262func (s *State) InitKnotServer(w http.ResponseWriter, r *http.Request) {
263 user := s.oauth.GetUser(r)
264
265 domain := chi.URLParam(r, "domain")
266 if domain == "" {
267 http.Error(w, "malformed url", http.StatusBadRequest)
268 return
269 }
270 log.Println("checking ", domain)
271
272 secret, err := db.GetRegistrationKey(s.db, domain)
273 if err != nil {
274 log.Printf("no key found for domain %s: %s\n", domain, err)
275 return
276 }
277
278 client, err := knotclient.NewSignedClient(domain, secret, s.config.Core.Dev)
279 if err != nil {
280 log.Println("failed to create client to ", domain)
281 }
282
283 resp, err := client.Init(user.Did)
284 if err != nil {
285 w.Write([]byte("no dice"))
286 log.Println("domain was unreachable after 5 seconds")
287 return
288 }
289
290 if resp.StatusCode == http.StatusConflict {
291 log.Println("status conflict", resp.StatusCode)
292 w.Write([]byte("already registered, sorry!"))
293 return
294 }
295
296 if resp.StatusCode != http.StatusNoContent {
297 log.Println("status nok", resp.StatusCode)
298 w.Write([]byte("no dice"))
299 return
300 }
301
302 // verify response mac
303 signature := resp.Header.Get("X-Signature")
304 signatureBytes, err := hex.DecodeString(signature)
305 if err != nil {
306 return
307 }
308
309 expectedMac := hmac.New(sha256.New, []byte(secret))
310 expectedMac.Write([]byte("ok"))
311
312 if !hmac.Equal(expectedMac.Sum(nil), signatureBytes) {
313 log.Printf("response body signature mismatch: %x\n", signatureBytes)
314 return
315 }
316
317 tx, err := s.db.BeginTx(r.Context(), nil)
318 if err != nil {
319 log.Println("failed to start tx", err)
320 http.Error(w, err.Error(), http.StatusInternalServerError)
321 return
322 }
323 defer func() {
324 tx.Rollback()
325 err = s.enforcer.E.LoadPolicy()
326 if err != nil {
327 log.Println("failed to rollback policies")
328 }
329 }()
330
331 // mark as registered
332 err = db.Register(tx, domain)
333 if err != nil {
334 log.Println("failed to register domain", err)
335 http.Error(w, err.Error(), http.StatusInternalServerError)
336 return
337 }
338
339 // set permissions for this did as owner
340 reg, err := db.RegistrationByDomain(tx, domain)
341 if err != nil {
342 log.Println("failed to register domain", err)
343 http.Error(w, err.Error(), http.StatusInternalServerError)
344 return
345 }
346
347 // add basic acls for this domain
348 err = s.enforcer.AddKnot(domain)
349 if err != nil {
350 log.Println("failed to setup owner of domain", err)
351 http.Error(w, err.Error(), http.StatusInternalServerError)
352 return
353 }
354
355 // add this did as owner of this domain
356 err = s.enforcer.AddKnotOwner(domain, reg.ByDid)
357 if err != nil {
358 log.Println("failed to setup owner of domain", err)
359 http.Error(w, err.Error(), http.StatusInternalServerError)
360 return
361 }
362
363 err = tx.Commit()
364 if err != nil {
365 log.Println("failed to commit changes", err)
366 http.Error(w, err.Error(), http.StatusInternalServerError)
367 return
368 }
369
370 err = s.enforcer.E.SavePolicy()
371 if err != nil {
372 log.Println("failed to update ACLs", err)
373 http.Error(w, err.Error(), http.StatusInternalServerError)
374 return
375 }
376
377 // add this knot to knotstream
378 go s.knotstream.AddSource(
379 context.Background(),
380 eventconsumer.NewKnotSource(domain),
381 )
382
383 w.Write([]byte("check success"))
384}
385
386func (s *State) KnotServerInfo(w http.ResponseWriter, r *http.Request) {
387 domain := chi.URLParam(r, "domain")
388 if domain == "" {
389 http.Error(w, "malformed url", http.StatusBadRequest)
390 return
391 }
392
393 user := s.oauth.GetUser(r)
394 reg, err := db.RegistrationByDomain(s.db, domain)
395 if err != nil {
396 w.Write([]byte("failed to pull up registration info"))
397 return
398 }
399
400 var members []string
401 if reg.Registered != nil {
402 members, err = s.enforcer.GetUserByRole("server:member", domain)
403 if err != nil {
404 w.Write([]byte("failed to fetch member list"))
405 return
406 }
407 }
408
409 var didsToResolve []string
410 for _, m := range members {
411 didsToResolve = append(didsToResolve, m)
412 }
413 didsToResolve = append(didsToResolve, reg.ByDid)
414 resolvedIds := s.idResolver.ResolveIdents(r.Context(), didsToResolve)
415 didHandleMap := make(map[string]string)
416 for _, identity := range resolvedIds {
417 if !identity.Handle.IsInvalidHandle() {
418 didHandleMap[identity.DID.String()] = fmt.Sprintf("@%s", identity.Handle.String())
419 } else {
420 didHandleMap[identity.DID.String()] = identity.DID.String()
421 }
422 }
423
424 ok, err := s.enforcer.IsKnotOwner(user.Did, domain)
425 isOwner := err == nil && ok
426
427 p := pages.KnotParams{
428 LoggedInUser: user,
429 DidHandleMap: didHandleMap,
430 Registration: reg,
431 Members: members,
432 IsOwner: isOwner,
433 }
434
435 s.pages.Knot(w, p)
436}
437
438// get knots registered by this user
439func (s *State) Knots(w http.ResponseWriter, r *http.Request) {
440 // for now, this is just pubkeys
441 user := s.oauth.GetUser(r)
442 registrations, err := db.RegistrationsByDid(s.db, user.Did)
443 if err != nil {
444 log.Println(err)
445 }
446
447 s.pages.Knots(w, pages.KnotsParams{
448 LoggedInUser: user,
449 Registrations: registrations,
450 })
451}
452
453// list members of domain, requires auth and requires owner status
454func (s *State) ListMembers(w http.ResponseWriter, r *http.Request) {
455 domain := chi.URLParam(r, "domain")
456 if domain == "" {
457 http.Error(w, "malformed url", http.StatusBadRequest)
458 return
459 }
460
461 // list all members for this domain
462 memberDids, err := s.enforcer.GetUserByRole("server:member", domain)
463 if err != nil {
464 w.Write([]byte("failed to fetch member list"))
465 return
466 }
467
468 w.Write([]byte(strings.Join(memberDids, "\n")))
469 return
470}
471
472// add member to domain, requires auth and requires invite access
473func (s *State) AddMember(w http.ResponseWriter, r *http.Request) {
474 domain := chi.URLParam(r, "domain")
475 if domain == "" {
476 http.Error(w, "malformed url", http.StatusBadRequest)
477 return
478 }
479
480 subjectIdentifier := r.FormValue("subject")
481 if subjectIdentifier == "" {
482 http.Error(w, "malformed form", http.StatusBadRequest)
483 return
484 }
485
486 subjectIdentity, err := s.idResolver.ResolveIdent(r.Context(), subjectIdentifier)
487 if err != nil {
488 w.Write([]byte("failed to resolve member did to a handle"))
489 return
490 }
491 log.Printf("adding %s to %s\n", subjectIdentity.Handle.String(), domain)
492
493 // announce this relation into the firehose, store into owners' pds
494 client, err := s.oauth.AuthorizedClient(r)
495 if err != nil {
496 http.Error(w, "failed to authorize client", http.StatusInternalServerError)
497 return
498 }
499 currentUser := s.oauth.GetUser(r)
500 createdAt := time.Now().Format(time.RFC3339)
501 resp, err := client.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{
502 Collection: tangled.KnotMemberNSID,
503 Repo: currentUser.Did,
504 Rkey: appview.TID(),
505 Record: &lexutil.LexiconTypeDecoder{
506 Val: &tangled.KnotMember{
507 Subject: subjectIdentity.DID.String(),
508 Domain: domain,
509 CreatedAt: createdAt,
510 }},
511 })
512
513 // invalid record
514 if err != nil {
515 log.Printf("failed to create record: %s", err)
516 return
517 }
518 log.Println("created atproto record: ", resp.Uri)
519
520 secret, err := db.GetRegistrationKey(s.db, domain)
521 if err != nil {
522 log.Printf("no key found for domain %s: %s\n", domain, err)
523 return
524 }
525
526 ksClient, err := knotclient.NewSignedClient(domain, secret, s.config.Core.Dev)
527 if err != nil {
528 log.Println("failed to create client to ", domain)
529 return
530 }
531
532 ksResp, err := ksClient.AddMember(subjectIdentity.DID.String())
533 if err != nil {
534 log.Printf("failed to make request to %s: %s", domain, err)
535 return
536 }
537
538 if ksResp.StatusCode != http.StatusNoContent {
539 w.Write([]byte(fmt.Sprint("knotserver failed to add member: ", err)))
540 return
541 }
542
543 err = s.enforcer.AddKnotMember(domain, subjectIdentity.DID.String())
544 if err != nil {
545 w.Write([]byte(fmt.Sprint("failed to add member: ", err)))
546 return
547 }
548
549 w.Write([]byte(fmt.Sprint("added member: ", subjectIdentity.Handle.String())))
550}
551
552func (s *State) RemoveMember(w http.ResponseWriter, r *http.Request) {
553}
554
555func validateRepoName(name string) error {
556 // check for path traversal attempts
557 if name == "." || name == ".." ||
558 strings.Contains(name, "/") || strings.Contains(name, "\\") {
559 return fmt.Errorf("Repository name contains invalid path characters")
560 }
561
562 // check for sequences that could be used for traversal when normalized
563 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
564 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
565 return fmt.Errorf("Repository name contains invalid path sequence")
566 }
567
568 // then continue with character validation
569 for _, char := range name {
570 if !((char >= 'a' && char <= 'z') ||
571 (char >= 'A' && char <= 'Z') ||
572 (char >= '0' && char <= '9') ||
573 char == '-' || char == '_' || char == '.') {
574 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
575 }
576 }
577
578 // additional check to prevent multiple sequential dots
579 if strings.Contains(name, "..") {
580 return fmt.Errorf("Repository name cannot contain sequential dots")
581 }
582
583 // if all checks pass
584 return nil
585}
586
587func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
588 switch r.Method {
589 case http.MethodGet:
590 user := s.oauth.GetUser(r)
591 knots, err := s.enforcer.GetKnotsForUser(user.Did)
592 if err != nil {
593 s.pages.Notice(w, "repo", "Invalid user account.")
594 return
595 }
596
597 s.pages.NewRepo(w, pages.NewRepoParams{
598 LoggedInUser: user,
599 Knots: knots,
600 })
601
602 case http.MethodPost:
603 user := s.oauth.GetUser(r)
604
605 domain := r.FormValue("domain")
606 if domain == "" {
607 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
608 return
609 }
610
611 repoName := r.FormValue("name")
612 if repoName == "" {
613 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
614 return
615 }
616
617 if err := validateRepoName(repoName); err != nil {
618 s.pages.Notice(w, "repo", err.Error())
619 return
620 }
621
622 defaultBranch := r.FormValue("branch")
623 if defaultBranch == "" {
624 defaultBranch = "main"
625 }
626
627 description := r.FormValue("description")
628
629 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
630 if err != nil || !ok {
631 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
632 return
633 }
634
635 existingRepo, err := db.GetRepo(s.db, user.Did, repoName)
636 if err == nil && existingRepo != nil {
637 s.pages.Notice(w, "repo", fmt.Sprintf("A repo by this name already exists on %s", existingRepo.Knot))
638 return
639 }
640
641 secret, err := db.GetRegistrationKey(s.db, domain)
642 if err != nil {
643 s.pages.Notice(w, "repo", fmt.Sprintf("No registration key found for knot %s.", domain))
644 return
645 }
646
647 client, err := knotclient.NewSignedClient(domain, secret, s.config.Core.Dev)
648 if err != nil {
649 s.pages.Notice(w, "repo", "Failed to connect to knot server.")
650 return
651 }
652
653 rkey := appview.TID()
654 repo := &db.Repo{
655 Did: user.Did,
656 Name: repoName,
657 Knot: domain,
658 Rkey: rkey,
659 Description: description,
660 }
661
662 xrpcClient, err := s.oauth.AuthorizedClient(r)
663 if err != nil {
664 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
665 return
666 }
667
668 createdAt := time.Now().Format(time.RFC3339)
669 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{
670 Collection: tangled.RepoNSID,
671 Repo: user.Did,
672 Rkey: rkey,
673 Record: &lexutil.LexiconTypeDecoder{
674 Val: &tangled.Repo{
675 Knot: repo.Knot,
676 Name: repoName,
677 CreatedAt: createdAt,
678 Owner: user.Did,
679 }},
680 })
681 if err != nil {
682 log.Printf("failed to create record: %s", err)
683 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
684 return
685 }
686 log.Println("created repo record: ", atresp.Uri)
687
688 tx, err := s.db.BeginTx(r.Context(), nil)
689 if err != nil {
690 log.Println(err)
691 s.pages.Notice(w, "repo", "Failed to save repository information.")
692 return
693 }
694 defer func() {
695 tx.Rollback()
696 err = s.enforcer.E.LoadPolicy()
697 if err != nil {
698 log.Println("failed to rollback policies")
699 }
700 }()
701
702 resp, err := client.NewRepo(user.Did, repoName, defaultBranch)
703 if err != nil {
704 s.pages.Notice(w, "repo", "Failed to create repository on knot server.")
705 return
706 }
707
708 switch resp.StatusCode {
709 case http.StatusConflict:
710 s.pages.Notice(w, "repo", "A repository with that name already exists.")
711 return
712 case http.StatusInternalServerError:
713 s.pages.Notice(w, "repo", "Failed to create repository on knot. Try again later.")
714 case http.StatusNoContent:
715 // continue
716 }
717
718 repo.AtUri = atresp.Uri
719 err = db.AddRepo(tx, repo)
720 if err != nil {
721 log.Println(err)
722 s.pages.Notice(w, "repo", "Failed to save repository information.")
723 return
724 }
725
726 // acls
727 p, _ := securejoin.SecureJoin(user.Did, repoName)
728 err = s.enforcer.AddRepo(user.Did, domain, p)
729 if err != nil {
730 log.Println(err)
731 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
732 return
733 }
734
735 err = tx.Commit()
736 if err != nil {
737 log.Println("failed to commit changes", err)
738 http.Error(w, err.Error(), http.StatusInternalServerError)
739 return
740 }
741
742 err = s.enforcer.E.SavePolicy()
743 if err != nil {
744 log.Println("failed to update ACLs", err)
745 http.Error(w, err.Error(), http.StatusInternalServerError)
746 return
747 }
748
749 if !s.config.Core.Dev {
750 err = s.posthog.Enqueue(posthog.Capture{
751 DistinctId: user.Did,
752 Event: "new_repo",
753 Properties: posthog.Properties{"repo": repoName, "repo_at": repo.AtUri},
754 })
755 if err != nil {
756 log.Println("failed to enqueue posthog event:", err)
757 }
758 }
759
760 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName))
761 return
762 }
763}