forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "log/slog"
8 "net/http"
9 "strings"
10 "time"
11
12 comatproto "github.com/bluesky-social/indigo/api/atproto"
13 lexutil "github.com/bluesky-social/indigo/lex/util"
14 securejoin "github.com/cyphar/filepath-securejoin"
15 "github.com/go-chi/chi/v5"
16 "github.com/posthog/posthog-go"
17 "tangled.sh/tangled.sh/core/api/tangled"
18 "tangled.sh/tangled.sh/core/appview"
19 "tangled.sh/tangled.sh/core/appview/cache"
20 "tangled.sh/tangled.sh/core/appview/cache/session"
21 "tangled.sh/tangled.sh/core/appview/config"
22 "tangled.sh/tangled.sh/core/appview/db"
23 "tangled.sh/tangled.sh/core/appview/notify"
24 "tangled.sh/tangled.sh/core/appview/oauth"
25 "tangled.sh/tangled.sh/core/appview/pages"
26 posthogService "tangled.sh/tangled.sh/core/appview/posthog"
27 "tangled.sh/tangled.sh/core/appview/reporesolver"
28 "tangled.sh/tangled.sh/core/eventconsumer"
29 "tangled.sh/tangled.sh/core/idresolver"
30 "tangled.sh/tangled.sh/core/jetstream"
31 "tangled.sh/tangled.sh/core/knotclient"
32 tlog "tangled.sh/tangled.sh/core/log"
33 "tangled.sh/tangled.sh/core/rbac"
34 "tangled.sh/tangled.sh/core/tid"
35)
36
37type State struct {
38 db *db.DB
39 notifier notify.Notifier
40 oauth *oauth.OAuth
41 enforcer *rbac.Enforcer
42 pages *pages.Pages
43 sess *session.SessionStore
44 idResolver *idresolver.Resolver
45 posthog posthog.Client
46 jc *jetstream.JetstreamClient
47 config *config.Config
48 repoResolver *reporesolver.RepoResolver
49 knotstream *eventconsumer.Consumer
50 spindlestream *eventconsumer.Consumer
51}
52
53func Make(ctx context.Context, config *config.Config) (*State, error) {
54 d, err := db.Make(config.Core.DbPath)
55 if err != nil {
56 return nil, fmt.Errorf("failed to create db: %w", err)
57 }
58
59 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
60 if err != nil {
61 return nil, fmt.Errorf("failed to create enforcer: %w", err)
62 }
63
64 pgs := pages.NewPages(config)
65
66 res, err := idresolver.RedisResolver(config.Redis.ToURL())
67 if err != nil {
68 log.Printf("failed to create redis resolver: %v", err)
69 res = idresolver.DefaultResolver()
70 }
71
72 cache := cache.New(config.Redis.Addr)
73 sess := session.New(cache)
74
75 oauth := oauth.NewOAuth(config, sess)
76
77 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
78 if err != nil {
79 return nil, fmt.Errorf("failed to create posthog client: %w", err)
80 }
81
82 repoResolver := reporesolver.New(config, enforcer, res, d)
83
84 wrapper := db.DbWrapper{d}
85 jc, err := jetstream.NewJetstreamClient(
86 config.Jetstream.Endpoint,
87 "appview",
88 []string{
89 tangled.GraphFollowNSID,
90 tangled.FeedStarNSID,
91 tangled.PublicKeyNSID,
92 tangled.RepoArtifactNSID,
93 tangled.ActorProfileNSID,
94 tangled.SpindleMemberNSID,
95 tangled.SpindleNSID,
96 tangled.StringNSID,
97 },
98 nil,
99 slog.Default(),
100 wrapper,
101 false,
102
103 // in-memory filter is inapplicalble to appview so
104 // we'll never log dids anyway.
105 false,
106 )
107 if err != nil {
108 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
109 }
110
111 ingester := appview.Ingester{
112 Db: wrapper,
113 Enforcer: enforcer,
114 IdResolver: res,
115 Config: config,
116 Logger: tlog.New("ingester"),
117 }
118 err = jc.StartJetstream(ctx, ingester.Ingest())
119 if err != nil {
120 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
121 }
122
123 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
124 if err != nil {
125 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
126 }
127 knotstream.Start(ctx)
128
129 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
130 if err != nil {
131 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
132 }
133 spindlestream.Start(ctx)
134
135 var notifiers []notify.Notifier
136 if !config.Core.Dev {
137 notifiers = append(notifiers, posthogService.NewPosthogNotifier(posthog))
138 }
139 notifier := notify.NewMergedNotifier(notifiers...)
140
141 state := &State{
142 d,
143 notifier,
144 oauth,
145 enforcer,
146 pgs,
147 sess,
148 res,
149 posthog,
150 jc,
151 config,
152 repoResolver,
153 knotstream,
154 spindlestream,
155 }
156
157 return state, nil
158}
159
160func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
161 user := s.oauth.GetUser(r)
162 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
163 LoggedInUser: user,
164 })
165}
166
167func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
168 user := s.oauth.GetUser(r)
169 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
170 LoggedInUser: user,
171 })
172}
173
174func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
175 user := s.oauth.GetUser(r)
176
177 timeline, err := db.MakeTimeline(s.db)
178 if err != nil {
179 log.Println(err)
180 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
181 }
182
183 var didsToResolve []string
184 for _, ev := range timeline {
185 if ev.Repo != nil {
186 didsToResolve = append(didsToResolve, ev.Repo.Did)
187 if ev.Source != nil {
188 didsToResolve = append(didsToResolve, ev.Source.Did)
189 }
190 }
191 if ev.Follow != nil {
192 didsToResolve = append(didsToResolve, ev.Follow.UserDid, ev.Follow.SubjectDid)
193 }
194 if ev.Star != nil {
195 didsToResolve = append(didsToResolve, ev.Star.StarredByDid, ev.Star.Repo.Did)
196 }
197 }
198
199 resolvedIds := s.idResolver.ResolveIdents(r.Context(), didsToResolve)
200 didHandleMap := make(map[string]string)
201 for _, identity := range resolvedIds {
202 if !identity.Handle.IsInvalidHandle() {
203 didHandleMap[identity.DID.String()] = fmt.Sprintf("@%s", identity.Handle.String())
204 } else {
205 didHandleMap[identity.DID.String()] = identity.DID.String()
206 }
207 }
208
209 s.pages.Timeline(w, pages.TimelineParams{
210 LoggedInUser: user,
211 Timeline: timeline,
212 DidHandleMap: didHandleMap,
213 })
214
215 return
216}
217
218func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
219 user := chi.URLParam(r, "user")
220 user = strings.TrimPrefix(user, "@")
221
222 if user == "" {
223 w.WriteHeader(http.StatusBadRequest)
224 return
225 }
226
227 id, err := s.idResolver.ResolveIdent(r.Context(), user)
228 if err != nil {
229 w.WriteHeader(http.StatusInternalServerError)
230 return
231 }
232
233 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
234 if err != nil {
235 w.WriteHeader(http.StatusNotFound)
236 return
237 }
238
239 if len(pubKeys) == 0 {
240 w.WriteHeader(http.StatusNotFound)
241 return
242 }
243
244 for _, k := range pubKeys {
245 key := strings.TrimRight(k.Key, "\n")
246 w.Write([]byte(fmt.Sprintln(key)))
247 }
248}
249
250func validateRepoName(name string) error {
251 // check for path traversal attempts
252 if name == "." || name == ".." ||
253 strings.Contains(name, "/") || strings.Contains(name, "\\") {
254 return fmt.Errorf("Repository name contains invalid path characters")
255 }
256
257 // check for sequences that could be used for traversal when normalized
258 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
259 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
260 return fmt.Errorf("Repository name contains invalid path sequence")
261 }
262
263 // then continue with character validation
264 for _, char := range name {
265 if !((char >= 'a' && char <= 'z') ||
266 (char >= 'A' && char <= 'Z') ||
267 (char >= '0' && char <= '9') ||
268 char == '-' || char == '_' || char == '.') {
269 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
270 }
271 }
272
273 // additional check to prevent multiple sequential dots
274 if strings.Contains(name, "..") {
275 return fmt.Errorf("Repository name cannot contain sequential dots")
276 }
277
278 // if all checks pass
279 return nil
280}
281
282func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
283 switch r.Method {
284 case http.MethodGet:
285 user := s.oauth.GetUser(r)
286 knots, err := s.enforcer.GetKnotsForUser(user.Did)
287 if err != nil {
288 s.pages.Notice(w, "repo", "Invalid user account.")
289 return
290 }
291
292 s.pages.NewRepo(w, pages.NewRepoParams{
293 LoggedInUser: user,
294 Knots: knots,
295 })
296
297 case http.MethodPost:
298 user := s.oauth.GetUser(r)
299
300 domain := r.FormValue("domain")
301 if domain == "" {
302 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
303 return
304 }
305
306 repoName := r.FormValue("name")
307 if repoName == "" {
308 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
309 return
310 }
311
312 if err := validateRepoName(repoName); err != nil {
313 s.pages.Notice(w, "repo", err.Error())
314 return
315 }
316
317 defaultBranch := r.FormValue("branch")
318 if defaultBranch == "" {
319 defaultBranch = "main"
320 }
321
322 description := r.FormValue("description")
323
324 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
325 if err != nil || !ok {
326 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
327 return
328 }
329
330 existingRepo, err := db.GetRepo(s.db, user.Did, repoName)
331 if err == nil && existingRepo != nil {
332 s.pages.Notice(w, "repo", fmt.Sprintf("A repo by this name already exists on %s", existingRepo.Knot))
333 return
334 }
335
336 secret, err := db.GetRegistrationKey(s.db, domain)
337 if err != nil {
338 s.pages.Notice(w, "repo", fmt.Sprintf("No registration key found for knot %s.", domain))
339 return
340 }
341
342 client, err := knotclient.NewSignedClient(domain, secret, s.config.Core.Dev)
343 if err != nil {
344 s.pages.Notice(w, "repo", "Failed to connect to knot server.")
345 return
346 }
347
348 rkey := tid.TID()
349 repo := &db.Repo{
350 Did: user.Did,
351 Name: repoName,
352 Knot: domain,
353 Rkey: rkey,
354 Description: description,
355 }
356
357 xrpcClient, err := s.oauth.AuthorizedClient(r)
358 if err != nil {
359 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
360 return
361 }
362
363 createdAt := time.Now().Format(time.RFC3339)
364 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{
365 Collection: tangled.RepoNSID,
366 Repo: user.Did,
367 Rkey: rkey,
368 Record: &lexutil.LexiconTypeDecoder{
369 Val: &tangled.Repo{
370 Knot: repo.Knot,
371 Name: repoName,
372 CreatedAt: createdAt,
373 Owner: user.Did,
374 }},
375 })
376 if err != nil {
377 log.Printf("failed to create record: %s", err)
378 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
379 return
380 }
381 log.Println("created repo record: ", atresp.Uri)
382
383 tx, err := s.db.BeginTx(r.Context(), nil)
384 if err != nil {
385 log.Println(err)
386 s.pages.Notice(w, "repo", "Failed to save repository information.")
387 return
388 }
389 defer func() {
390 tx.Rollback()
391 err = s.enforcer.E.LoadPolicy()
392 if err != nil {
393 log.Println("failed to rollback policies")
394 }
395 }()
396
397 resp, err := client.NewRepo(user.Did, repoName, defaultBranch)
398 if err != nil {
399 s.pages.Notice(w, "repo", "Failed to create repository on knot server.")
400 return
401 }
402
403 switch resp.StatusCode {
404 case http.StatusConflict:
405 s.pages.Notice(w, "repo", "A repository with that name already exists.")
406 return
407 case http.StatusInternalServerError:
408 s.pages.Notice(w, "repo", "Failed to create repository on knot. Try again later.")
409 case http.StatusNoContent:
410 // continue
411 }
412
413 repo.AtUri = atresp.Uri
414 err = db.AddRepo(tx, repo)
415 if err != nil {
416 log.Println(err)
417 s.pages.Notice(w, "repo", "Failed to save repository information.")
418 return
419 }
420
421 // acls
422 p, _ := securejoin.SecureJoin(user.Did, repoName)
423 err = s.enforcer.AddRepo(user.Did, domain, p)
424 if err != nil {
425 log.Println(err)
426 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
427 return
428 }
429
430 err = tx.Commit()
431 if err != nil {
432 log.Println("failed to commit changes", err)
433 http.Error(w, err.Error(), http.StatusInternalServerError)
434 return
435 }
436
437 err = s.enforcer.E.SavePolicy()
438 if err != nil {
439 log.Println("failed to update ACLs", err)
440 http.Error(w, err.Error(), http.StatusInternalServerError)
441 return
442 }
443
444 s.notifier.NewRepo(r.Context(), repo)
445
446 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName))
447 return
448 }
449}