forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state
2
3import (
4 "context"
5 "crypto/hmac"
6 "crypto/sha256"
7 "encoding/hex"
8 "fmt"
9 "log"
10 "log/slog"
11 "net/http"
12 "strings"
13 "time"
14
15 comatproto "github.com/bluesky-social/indigo/api/atproto"
16 "github.com/bluesky-social/indigo/atproto/syntax"
17 lexutil "github.com/bluesky-social/indigo/lex/util"
18 securejoin "github.com/cyphar/filepath-securejoin"
19 "github.com/go-chi/chi/v5"
20 "github.com/posthog/posthog-go"
21 "tangled.sh/tangled.sh/core/api/tangled"
22 "tangled.sh/tangled.sh/core/appview"
23 "tangled.sh/tangled.sh/core/appview/cache"
24 "tangled.sh/tangled.sh/core/appview/cache/session"
25 "tangled.sh/tangled.sh/core/appview/config"
26 "tangled.sh/tangled.sh/core/appview/db"
27 "tangled.sh/tangled.sh/core/appview/idresolver"
28 "tangled.sh/tangled.sh/core/appview/oauth"
29 "tangled.sh/tangled.sh/core/appview/pages"
30 "tangled.sh/tangled.sh/core/appview/reporesolver"
31 "tangled.sh/tangled.sh/core/jetstream"
32 "tangled.sh/tangled.sh/core/knotclient"
33 "tangled.sh/tangled.sh/core/rbac"
34)
35
36type State struct {
37 db *db.DB
38 oauth *oauth.OAuth
39 enforcer *rbac.Enforcer
40 tidClock syntax.TIDClock
41 pages *pages.Pages
42 sess *session.SessionStore
43 idResolver *idresolver.Resolver
44 posthog posthog.Client
45 jc *jetstream.JetstreamClient
46 config *config.Config
47 repoResolver *reporesolver.RepoResolver
48 knotstream *knotclient.EventConsumer
49}
50
51func Make(ctx context.Context, config *config.Config) (*State, error) {
52 d, err := db.Make(config.Core.DbPath)
53 if err != nil {
54 return nil, err
55 }
56
57 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
58 if err != nil {
59 return nil, err
60 }
61
62 clock := syntax.NewTIDClock(0)
63
64 pgs := pages.NewPages(config)
65
66 res, err := idresolver.RedisResolver(config.Redis)
67 if err != nil {
68 log.Printf("failed to create redis resolver: %v", err)
69 res = idresolver.DefaultResolver()
70 }
71
72 cache := cache.New(config.Redis.Addr)
73 sess := session.New(cache)
74
75 oauth := oauth.NewOAuth(config, sess)
76
77 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
78 if err != nil {
79 return nil, fmt.Errorf("failed to create posthog client: %w", err)
80 }
81
82 repoResolver := reporesolver.New(config, enforcer, res, d)
83
84 wrapper := db.DbWrapper{d}
85 jc, err := jetstream.NewJetstreamClient(
86 config.Jetstream.Endpoint,
87 "appview",
88 []string{
89 tangled.GraphFollowNSID,
90 tangled.FeedStarNSID,
91 tangled.PublicKeyNSID,
92 tangled.RepoArtifactNSID,
93 tangled.ActorProfileNSID,
94 },
95 nil,
96 slog.Default(),
97 wrapper,
98 false,
99
100 // in-memory filter is inapplicalble to appview so
101 // we'll never log dids anyway.
102 false,
103 )
104 if err != nil {
105 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
106 }
107 err = jc.StartJetstream(ctx, appview.Ingest(wrapper, enforcer))
108 if err != nil {
109 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
110 }
111
112 knotstream, err := KnotstreamConsumer(ctx, config, d, enforcer, posthog)
113 if err != nil {
114 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
115 }
116 knotstream.Start(ctx)
117
118 state := &State{
119 d,
120 oauth,
121 enforcer,
122 clock,
123 pgs,
124 sess,
125 res,
126 posthog,
127 jc,
128 config,
129 repoResolver,
130 knotstream,
131 }
132
133 return state, nil
134}
135
136func TID(c *syntax.TIDClock) string {
137 return c.Next().String()
138}
139
140func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
141 user := s.oauth.GetUser(r)
142
143 timeline, err := db.MakeTimeline(s.db)
144 if err != nil {
145 log.Println(err)
146 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
147 }
148
149 var didsToResolve []string
150 for _, ev := range timeline {
151 if ev.Repo != nil {
152 didsToResolve = append(didsToResolve, ev.Repo.Did)
153 if ev.Source != nil {
154 didsToResolve = append(didsToResolve, ev.Source.Did)
155 }
156 }
157 if ev.Follow != nil {
158 didsToResolve = append(didsToResolve, ev.Follow.UserDid, ev.Follow.SubjectDid)
159 }
160 if ev.Star != nil {
161 didsToResolve = append(didsToResolve, ev.Star.StarredByDid, ev.Star.Repo.Did)
162 }
163 }
164
165 resolvedIds := s.idResolver.ResolveIdents(r.Context(), didsToResolve)
166 didHandleMap := make(map[string]string)
167 for _, identity := range resolvedIds {
168 if !identity.Handle.IsInvalidHandle() {
169 didHandleMap[identity.DID.String()] = fmt.Sprintf("@%s", identity.Handle.String())
170 } else {
171 didHandleMap[identity.DID.String()] = identity.DID.String()
172 }
173 }
174
175 s.pages.Timeline(w, pages.TimelineParams{
176 LoggedInUser: user,
177 Timeline: timeline,
178 DidHandleMap: didHandleMap,
179 })
180
181 return
182}
183
184// requires auth
185func (s *State) RegistrationKey(w http.ResponseWriter, r *http.Request) {
186 switch r.Method {
187 case http.MethodGet:
188 // list open registrations under this did
189
190 return
191 case http.MethodPost:
192 session, err := s.oauth.Stores().Get(r, oauth.SessionName)
193 if err != nil || session.IsNew {
194 log.Println("unauthorized attempt to generate registration key")
195 http.Error(w, "Forbidden", http.StatusUnauthorized)
196 return
197 }
198
199 did := session.Values[oauth.SessionDid].(string)
200
201 // check if domain is valid url, and strip extra bits down to just host
202 domain := r.FormValue("domain")
203 if domain == "" {
204 http.Error(w, "Invalid form", http.StatusBadRequest)
205 return
206 }
207
208 key, err := db.GenerateRegistrationKey(s.db, domain, did)
209
210 if err != nil {
211 log.Println(err)
212 http.Error(w, "unable to register this domain", http.StatusNotAcceptable)
213 return
214 }
215
216 w.Write([]byte(key))
217 }
218}
219
220func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
221 user := chi.URLParam(r, "user")
222 user = strings.TrimPrefix(user, "@")
223
224 if user == "" {
225 w.WriteHeader(http.StatusBadRequest)
226 return
227 }
228
229 id, err := s.idResolver.ResolveIdent(r.Context(), user)
230 if err != nil {
231 w.WriteHeader(http.StatusInternalServerError)
232 return
233 }
234
235 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
236 if err != nil {
237 w.WriteHeader(http.StatusNotFound)
238 return
239 }
240
241 if len(pubKeys) == 0 {
242 w.WriteHeader(http.StatusNotFound)
243 return
244 }
245
246 for _, k := range pubKeys {
247 key := strings.TrimRight(k.Key, "\n")
248 w.Write([]byte(fmt.Sprintln(key)))
249 }
250}
251
252// create a signed request and check if a node responds to that
253func (s *State) InitKnotServer(w http.ResponseWriter, r *http.Request) {
254 user := s.oauth.GetUser(r)
255
256 domain := chi.URLParam(r, "domain")
257 if domain == "" {
258 http.Error(w, "malformed url", http.StatusBadRequest)
259 return
260 }
261 log.Println("checking ", domain)
262
263 secret, err := db.GetRegistrationKey(s.db, domain)
264 if err != nil {
265 log.Printf("no key found for domain %s: %s\n", domain, err)
266 return
267 }
268
269 client, err := knotclient.NewSignedClient(domain, secret, s.config.Core.Dev)
270 if err != nil {
271 log.Println("failed to create client to ", domain)
272 }
273
274 resp, err := client.Init(user.Did)
275 if err != nil {
276 w.Write([]byte("no dice"))
277 log.Println("domain was unreachable after 5 seconds")
278 return
279 }
280
281 if resp.StatusCode == http.StatusConflict {
282 log.Println("status conflict", resp.StatusCode)
283 w.Write([]byte("already registered, sorry!"))
284 return
285 }
286
287 if resp.StatusCode != http.StatusNoContent {
288 log.Println("status nok", resp.StatusCode)
289 w.Write([]byte("no dice"))
290 return
291 }
292
293 // verify response mac
294 signature := resp.Header.Get("X-Signature")
295 signatureBytes, err := hex.DecodeString(signature)
296 if err != nil {
297 return
298 }
299
300 expectedMac := hmac.New(sha256.New, []byte(secret))
301 expectedMac.Write([]byte("ok"))
302
303 if !hmac.Equal(expectedMac.Sum(nil), signatureBytes) {
304 log.Printf("response body signature mismatch: %x\n", signatureBytes)
305 return
306 }
307
308 tx, err := s.db.BeginTx(r.Context(), nil)
309 if err != nil {
310 log.Println("failed to start tx", err)
311 http.Error(w, err.Error(), http.StatusInternalServerError)
312 return
313 }
314 defer func() {
315 tx.Rollback()
316 err = s.enforcer.E.LoadPolicy()
317 if err != nil {
318 log.Println("failed to rollback policies")
319 }
320 }()
321
322 // mark as registered
323 err = db.Register(tx, domain)
324 if err != nil {
325 log.Println("failed to register domain", err)
326 http.Error(w, err.Error(), http.StatusInternalServerError)
327 return
328 }
329
330 // set permissions for this did as owner
331 reg, err := db.RegistrationByDomain(tx, domain)
332 if err != nil {
333 log.Println("failed to register domain", err)
334 http.Error(w, err.Error(), http.StatusInternalServerError)
335 return
336 }
337
338 // add basic acls for this domain
339 err = s.enforcer.AddKnot(domain)
340 if err != nil {
341 log.Println("failed to setup owner of domain", err)
342 http.Error(w, err.Error(), http.StatusInternalServerError)
343 return
344 }
345
346 // add this did as owner of this domain
347 err = s.enforcer.AddKnotOwner(domain, reg.ByDid)
348 if err != nil {
349 log.Println("failed to setup owner of domain", err)
350 http.Error(w, err.Error(), http.StatusInternalServerError)
351 return
352 }
353
354 err = tx.Commit()
355 if err != nil {
356 log.Println("failed to commit changes", err)
357 http.Error(w, err.Error(), http.StatusInternalServerError)
358 return
359 }
360
361 err = s.enforcer.E.SavePolicy()
362 if err != nil {
363 log.Println("failed to update ACLs", err)
364 http.Error(w, err.Error(), http.StatusInternalServerError)
365 return
366 }
367
368 // add this knot to knotstream
369 go s.knotstream.AddSource(context.Background(), knotclient.EventSource{domain})
370
371 w.Write([]byte("check success"))
372}
373
374func (s *State) KnotServerInfo(w http.ResponseWriter, r *http.Request) {
375 domain := chi.URLParam(r, "domain")
376 if domain == "" {
377 http.Error(w, "malformed url", http.StatusBadRequest)
378 return
379 }
380
381 user := s.oauth.GetUser(r)
382 reg, err := db.RegistrationByDomain(s.db, domain)
383 if err != nil {
384 w.Write([]byte("failed to pull up registration info"))
385 return
386 }
387
388 var members []string
389 if reg.Registered != nil {
390 members, err = s.enforcer.GetUserByRole("server:member", domain)
391 if err != nil {
392 w.Write([]byte("failed to fetch member list"))
393 return
394 }
395 }
396
397 var didsToResolve []string
398 for _, m := range members {
399 didsToResolve = append(didsToResolve, m)
400 }
401 didsToResolve = append(didsToResolve, reg.ByDid)
402 resolvedIds := s.idResolver.ResolveIdents(r.Context(), didsToResolve)
403 didHandleMap := make(map[string]string)
404 for _, identity := range resolvedIds {
405 if !identity.Handle.IsInvalidHandle() {
406 didHandleMap[identity.DID.String()] = fmt.Sprintf("@%s", identity.Handle.String())
407 } else {
408 didHandleMap[identity.DID.String()] = identity.DID.String()
409 }
410 }
411
412 ok, err := s.enforcer.IsKnotOwner(user.Did, domain)
413 isOwner := err == nil && ok
414
415 p := pages.KnotParams{
416 LoggedInUser: user,
417 DidHandleMap: didHandleMap,
418 Registration: reg,
419 Members: members,
420 IsOwner: isOwner,
421 }
422
423 s.pages.Knot(w, p)
424}
425
426// get knots registered by this user
427func (s *State) Knots(w http.ResponseWriter, r *http.Request) {
428 // for now, this is just pubkeys
429 user := s.oauth.GetUser(r)
430 registrations, err := db.RegistrationsByDid(s.db, user.Did)
431 if err != nil {
432 log.Println(err)
433 }
434
435 s.pages.Knots(w, pages.KnotsParams{
436 LoggedInUser: user,
437 Registrations: registrations,
438 })
439}
440
441// list members of domain, requires auth and requires owner status
442func (s *State) ListMembers(w http.ResponseWriter, r *http.Request) {
443 domain := chi.URLParam(r, "domain")
444 if domain == "" {
445 http.Error(w, "malformed url", http.StatusBadRequest)
446 return
447 }
448
449 // list all members for this domain
450 memberDids, err := s.enforcer.GetUserByRole("server:member", domain)
451 if err != nil {
452 w.Write([]byte("failed to fetch member list"))
453 return
454 }
455
456 w.Write([]byte(strings.Join(memberDids, "\n")))
457 return
458}
459
460// add member to domain, requires auth and requires invite access
461func (s *State) AddMember(w http.ResponseWriter, r *http.Request) {
462 domain := chi.URLParam(r, "domain")
463 if domain == "" {
464 http.Error(w, "malformed url", http.StatusBadRequest)
465 return
466 }
467
468 subjectIdentifier := r.FormValue("subject")
469 if subjectIdentifier == "" {
470 http.Error(w, "malformed form", http.StatusBadRequest)
471 return
472 }
473
474 subjectIdentity, err := s.idResolver.ResolveIdent(r.Context(), subjectIdentifier)
475 if err != nil {
476 w.Write([]byte("failed to resolve member did to a handle"))
477 return
478 }
479 log.Printf("adding %s to %s\n", subjectIdentity.Handle.String(), domain)
480
481 // announce this relation into the firehose, store into owners' pds
482 client, err := s.oauth.AuthorizedClient(r)
483 if err != nil {
484 http.Error(w, "failed to authorize client", http.StatusInternalServerError)
485 return
486 }
487 currentUser := s.oauth.GetUser(r)
488 createdAt := time.Now().Format(time.RFC3339)
489 resp, err := client.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{
490 Collection: tangled.KnotMemberNSID,
491 Repo: currentUser.Did,
492 Rkey: appview.TID(),
493 Record: &lexutil.LexiconTypeDecoder{
494 Val: &tangled.KnotMember{
495 Subject: subjectIdentity.DID.String(),
496 Domain: domain,
497 CreatedAt: createdAt,
498 }},
499 })
500
501 // invalid record
502 if err != nil {
503 log.Printf("failed to create record: %s", err)
504 return
505 }
506 log.Println("created atproto record: ", resp.Uri)
507
508 secret, err := db.GetRegistrationKey(s.db, domain)
509 if err != nil {
510 log.Printf("no key found for domain %s: %s\n", domain, err)
511 return
512 }
513
514 ksClient, err := knotclient.NewSignedClient(domain, secret, s.config.Core.Dev)
515 if err != nil {
516 log.Println("failed to create client to ", domain)
517 return
518 }
519
520 ksResp, err := ksClient.AddMember(subjectIdentity.DID.String())
521 if err != nil {
522 log.Printf("failed to make request to %s: %s", domain, err)
523 return
524 }
525
526 if ksResp.StatusCode != http.StatusNoContent {
527 w.Write([]byte(fmt.Sprint("knotserver failed to add member: ", err)))
528 return
529 }
530
531 err = s.enforcer.AddKnotMember(domain, subjectIdentity.DID.String())
532 if err != nil {
533 w.Write([]byte(fmt.Sprint("failed to add member: ", err)))
534 return
535 }
536
537 w.Write([]byte(fmt.Sprint("added member: ", subjectIdentity.Handle.String())))
538}
539
540func (s *State) RemoveMember(w http.ResponseWriter, r *http.Request) {
541}
542
543func validateRepoName(name string) error {
544 // check for path traversal attempts
545 if name == "." || name == ".." ||
546 strings.Contains(name, "/") || strings.Contains(name, "\\") {
547 return fmt.Errorf("Repository name contains invalid path characters")
548 }
549
550 // check for sequences that could be used for traversal when normalized
551 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
552 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
553 return fmt.Errorf("Repository name contains invalid path sequence")
554 }
555
556 // then continue with character validation
557 for _, char := range name {
558 if !((char >= 'a' && char <= 'z') ||
559 (char >= 'A' && char <= 'Z') ||
560 (char >= '0' && char <= '9') ||
561 char == '-' || char == '_' || char == '.') {
562 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
563 }
564 }
565
566 // additional check to prevent multiple sequential dots
567 if strings.Contains(name, "..") {
568 return fmt.Errorf("Repository name cannot contain sequential dots")
569 }
570
571 // if all checks pass
572 return nil
573}
574
575func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
576 switch r.Method {
577 case http.MethodGet:
578 user := s.oauth.GetUser(r)
579 knots, err := s.enforcer.GetKnotsForUser(user.Did)
580 if err != nil {
581 s.pages.Notice(w, "repo", "Invalid user account.")
582 return
583 }
584
585 s.pages.NewRepo(w, pages.NewRepoParams{
586 LoggedInUser: user,
587 Knots: knots,
588 })
589
590 case http.MethodPost:
591 user := s.oauth.GetUser(r)
592
593 domain := r.FormValue("domain")
594 if domain == "" {
595 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
596 return
597 }
598
599 repoName := r.FormValue("name")
600 if repoName == "" {
601 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
602 return
603 }
604
605 if err := validateRepoName(repoName); err != nil {
606 s.pages.Notice(w, "repo", err.Error())
607 return
608 }
609
610 defaultBranch := r.FormValue("branch")
611 if defaultBranch == "" {
612 defaultBranch = "main"
613 }
614
615 description := r.FormValue("description")
616
617 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
618 if err != nil || !ok {
619 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
620 return
621 }
622
623 existingRepo, err := db.GetRepo(s.db, user.Did, repoName)
624 if err == nil && existingRepo != nil {
625 s.pages.Notice(w, "repo", fmt.Sprintf("A repo by this name already exists on %s", existingRepo.Knot))
626 return
627 }
628
629 secret, err := db.GetRegistrationKey(s.db, domain)
630 if err != nil {
631 s.pages.Notice(w, "repo", fmt.Sprintf("No registration key found for knot %s.", domain))
632 return
633 }
634
635 client, err := knotclient.NewSignedClient(domain, secret, s.config.Core.Dev)
636 if err != nil {
637 s.pages.Notice(w, "repo", "Failed to connect to knot server.")
638 return
639 }
640
641 rkey := appview.TID()
642 repo := &db.Repo{
643 Did: user.Did,
644 Name: repoName,
645 Knot: domain,
646 Rkey: rkey,
647 Description: description,
648 }
649
650 xrpcClient, err := s.oauth.AuthorizedClient(r)
651 if err != nil {
652 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
653 return
654 }
655
656 createdAt := time.Now().Format(time.RFC3339)
657 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{
658 Collection: tangled.RepoNSID,
659 Repo: user.Did,
660 Rkey: rkey,
661 Record: &lexutil.LexiconTypeDecoder{
662 Val: &tangled.Repo{
663 Knot: repo.Knot,
664 Name: repoName,
665 CreatedAt: createdAt,
666 Owner: user.Did,
667 }},
668 })
669 if err != nil {
670 log.Printf("failed to create record: %s", err)
671 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
672 return
673 }
674 log.Println("created repo record: ", atresp.Uri)
675
676 tx, err := s.db.BeginTx(r.Context(), nil)
677 if err != nil {
678 log.Println(err)
679 s.pages.Notice(w, "repo", "Failed to save repository information.")
680 return
681 }
682 defer func() {
683 tx.Rollback()
684 err = s.enforcer.E.LoadPolicy()
685 if err != nil {
686 log.Println("failed to rollback policies")
687 }
688 }()
689
690 resp, err := client.NewRepo(user.Did, repoName, defaultBranch)
691 if err != nil {
692 s.pages.Notice(w, "repo", "Failed to create repository on knot server.")
693 return
694 }
695
696 switch resp.StatusCode {
697 case http.StatusConflict:
698 s.pages.Notice(w, "repo", "A repository with that name already exists.")
699 return
700 case http.StatusInternalServerError:
701 s.pages.Notice(w, "repo", "Failed to create repository on knot. Try again later.")
702 case http.StatusNoContent:
703 // continue
704 }
705
706 repo.AtUri = atresp.Uri
707 err = db.AddRepo(tx, repo)
708 if err != nil {
709 log.Println(err)
710 s.pages.Notice(w, "repo", "Failed to save repository information.")
711 return
712 }
713
714 // acls
715 p, _ := securejoin.SecureJoin(user.Did, repoName)
716 err = s.enforcer.AddRepo(user.Did, domain, p)
717 if err != nil {
718 log.Println(err)
719 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
720 return
721 }
722
723 err = tx.Commit()
724 if err != nil {
725 log.Println("failed to commit changes", err)
726 http.Error(w, err.Error(), http.StatusInternalServerError)
727 return
728 }
729
730 err = s.enforcer.E.SavePolicy()
731 if err != nil {
732 log.Println("failed to update ACLs", err)
733 http.Error(w, err.Error(), http.StatusInternalServerError)
734 return
735 }
736
737 if !s.config.Core.Dev {
738 err = s.posthog.Enqueue(posthog.Capture{
739 DistinctId: user.Did,
740 Event: "new_repo",
741 Properties: posthog.Properties{"repo": repoName, "repo_at": repo.AtUri},
742 })
743 if err != nil {
744 log.Println("failed to enqueue posthog event:", err)
745 }
746 }
747
748 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName))
749 return
750 }
751}