forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "log/slog"
8 "net/http"
9 "strings"
10 "time"
11
12 comatproto "github.com/bluesky-social/indigo/api/atproto"
13 lexutil "github.com/bluesky-social/indigo/lex/util"
14 securejoin "github.com/cyphar/filepath-securejoin"
15 "github.com/go-chi/chi/v5"
16 "github.com/posthog/posthog-go"
17 "tangled.sh/tangled.sh/core/api/tangled"
18 "tangled.sh/tangled.sh/core/appview"
19 "tangled.sh/tangled.sh/core/appview/cache"
20 "tangled.sh/tangled.sh/core/appview/cache/session"
21 "tangled.sh/tangled.sh/core/appview/config"
22 "tangled.sh/tangled.sh/core/appview/db"
23 "tangled.sh/tangled.sh/core/appview/notify"
24 "tangled.sh/tangled.sh/core/appview/oauth"
25 "tangled.sh/tangled.sh/core/appview/pages"
26 posthogService "tangled.sh/tangled.sh/core/appview/posthog"
27 "tangled.sh/tangled.sh/core/appview/reporesolver"
28 "tangled.sh/tangled.sh/core/eventconsumer"
29 "tangled.sh/tangled.sh/core/idresolver"
30 "tangled.sh/tangled.sh/core/jetstream"
31 "tangled.sh/tangled.sh/core/knotclient"
32 tlog "tangled.sh/tangled.sh/core/log"
33 "tangled.sh/tangled.sh/core/rbac"
34 "tangled.sh/tangled.sh/core/tid"
35)
36
37type State struct {
38 db *db.DB
39 notifier notify.Notifier
40 oauth *oauth.OAuth
41 enforcer *rbac.Enforcer
42 pages *pages.Pages
43 sess *session.SessionStore
44 idResolver *idresolver.Resolver
45 posthog posthog.Client
46 jc *jetstream.JetstreamClient
47 config *config.Config
48 repoResolver *reporesolver.RepoResolver
49 knotstream *eventconsumer.Consumer
50 spindlestream *eventconsumer.Consumer
51}
52
53func Make(ctx context.Context, config *config.Config) (*State, error) {
54 d, err := db.Make(config.Core.DbPath)
55 if err != nil {
56 return nil, fmt.Errorf("failed to create db: %w", err)
57 }
58
59 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
60 if err != nil {
61 return nil, fmt.Errorf("failed to create enforcer: %w", err)
62 }
63
64 pgs := pages.NewPages(config)
65
66 res, err := idresolver.RedisResolver(config.Redis.ToURL())
67 if err != nil {
68 log.Printf("failed to create redis resolver: %v", err)
69 res = idresolver.DefaultResolver()
70 }
71
72 cache := cache.New(config.Redis.Addr)
73 sess := session.New(cache)
74
75 oauth := oauth.NewOAuth(config, sess)
76
77 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
78 if err != nil {
79 return nil, fmt.Errorf("failed to create posthog client: %w", err)
80 }
81
82 repoResolver := reporesolver.New(config, enforcer, res, d)
83
84 wrapper := db.DbWrapper{d}
85 jc, err := jetstream.NewJetstreamClient(
86 config.Jetstream.Endpoint,
87 "appview",
88 []string{
89 tangled.GraphFollowNSID,
90 tangled.FeedStarNSID,
91 tangled.PublicKeyNSID,
92 tangled.RepoArtifactNSID,
93 tangled.ActorProfileNSID,
94 tangled.SpindleMemberNSID,
95 tangled.SpindleNSID,
96 },
97 nil,
98 slog.Default(),
99 wrapper,
100 false,
101
102 // in-memory filter is inapplicalble to appview so
103 // we'll never log dids anyway.
104 false,
105 )
106 if err != nil {
107 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
108 }
109
110 ingester := appview.Ingester{
111 Db: wrapper,
112 Enforcer: enforcer,
113 IdResolver: res,
114 Config: config,
115 Logger: tlog.New("ingester"),
116 }
117 err = jc.StartJetstream(ctx, ingester.Ingest())
118 if err != nil {
119 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
120 }
121
122 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
123 if err != nil {
124 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
125 }
126 knotstream.Start(ctx)
127
128 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
129 if err != nil {
130 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
131 }
132 spindlestream.Start(ctx)
133
134 var notifiers []notify.Notifier
135 if !config.Core.Dev {
136 notifiers = append(notifiers, posthogService.NewPosthogNotifier(posthog))
137 }
138 notifier := notify.NewMergedNotifier(notifiers...)
139
140 state := &State{
141 d,
142 notifier,
143 oauth,
144 enforcer,
145 pgs,
146 sess,
147 res,
148 posthog,
149 jc,
150 config,
151 repoResolver,
152 knotstream,
153 spindlestream,
154 }
155
156 return state, nil
157}
158
159func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
160 user := s.oauth.GetUser(r)
161 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
162 LoggedInUser: user,
163 })
164}
165
166func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
167 user := s.oauth.GetUser(r)
168 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
169 LoggedInUser: user,
170 })
171}
172
173func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
174 user := s.oauth.GetUser(r)
175
176 timeline, err := db.MakeTimeline(s.db)
177 if err != nil {
178 log.Println(err)
179 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
180 }
181
182 var didsToResolve []string
183 for _, ev := range timeline {
184 if ev.Repo != nil {
185 didsToResolve = append(didsToResolve, ev.Repo.Did)
186 if ev.Source != nil {
187 didsToResolve = append(didsToResolve, ev.Source.Did)
188 }
189 }
190 if ev.Follow != nil {
191 didsToResolve = append(didsToResolve, ev.Follow.UserDid, ev.Follow.SubjectDid)
192 }
193 if ev.Star != nil {
194 didsToResolve = append(didsToResolve, ev.Star.StarredByDid, ev.Star.Repo.Did)
195 }
196 }
197
198 resolvedIds := s.idResolver.ResolveIdents(r.Context(), didsToResolve)
199 didHandleMap := make(map[string]string)
200 for _, identity := range resolvedIds {
201 if !identity.Handle.IsInvalidHandle() {
202 didHandleMap[identity.DID.String()] = fmt.Sprintf("@%s", identity.Handle.String())
203 } else {
204 didHandleMap[identity.DID.String()] = identity.DID.String()
205 }
206 }
207
208 s.pages.Timeline(w, pages.TimelineParams{
209 LoggedInUser: user,
210 Timeline: timeline,
211 DidHandleMap: didHandleMap,
212 })
213
214 return
215}
216
217func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
218 user := chi.URLParam(r, "user")
219 user = strings.TrimPrefix(user, "@")
220
221 if user == "" {
222 w.WriteHeader(http.StatusBadRequest)
223 return
224 }
225
226 id, err := s.idResolver.ResolveIdent(r.Context(), user)
227 if err != nil {
228 w.WriteHeader(http.StatusInternalServerError)
229 return
230 }
231
232 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
233 if err != nil {
234 w.WriteHeader(http.StatusNotFound)
235 return
236 }
237
238 if len(pubKeys) == 0 {
239 w.WriteHeader(http.StatusNotFound)
240 return
241 }
242
243 for _, k := range pubKeys {
244 key := strings.TrimRight(k.Key, "\n")
245 w.Write([]byte(fmt.Sprintln(key)))
246 }
247}
248
249func validateRepoName(name string) error {
250 // check for path traversal attempts
251 if name == "." || name == ".." ||
252 strings.Contains(name, "/") || strings.Contains(name, "\\") {
253 return fmt.Errorf("Repository name contains invalid path characters")
254 }
255
256 // check for sequences that could be used for traversal when normalized
257 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
258 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
259 return fmt.Errorf("Repository name contains invalid path sequence")
260 }
261
262 // then continue with character validation
263 for _, char := range name {
264 if !((char >= 'a' && char <= 'z') ||
265 (char >= 'A' && char <= 'Z') ||
266 (char >= '0' && char <= '9') ||
267 char == '-' || char == '_' || char == '.') {
268 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
269 }
270 }
271
272 // additional check to prevent multiple sequential dots
273 if strings.Contains(name, "..") {
274 return fmt.Errorf("Repository name cannot contain sequential dots")
275 }
276
277 // if all checks pass
278 return nil
279}
280
281func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
282 switch r.Method {
283 case http.MethodGet:
284 user := s.oauth.GetUser(r)
285 knots, err := s.enforcer.GetKnotsForUser(user.Did)
286 if err != nil {
287 s.pages.Notice(w, "repo", "Invalid user account.")
288 return
289 }
290
291 s.pages.NewRepo(w, pages.NewRepoParams{
292 LoggedInUser: user,
293 Knots: knots,
294 })
295
296 case http.MethodPost:
297 user := s.oauth.GetUser(r)
298
299 domain := r.FormValue("domain")
300 if domain == "" {
301 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
302 return
303 }
304
305 repoName := r.FormValue("name")
306 if repoName == "" {
307 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
308 return
309 }
310
311 if err := validateRepoName(repoName); err != nil {
312 s.pages.Notice(w, "repo", err.Error())
313 return
314 }
315
316 defaultBranch := r.FormValue("branch")
317 if defaultBranch == "" {
318 defaultBranch = "main"
319 }
320
321 description := r.FormValue("description")
322
323 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
324 if err != nil || !ok {
325 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
326 return
327 }
328
329 existingRepo, err := db.GetRepo(s.db, user.Did, repoName)
330 if err == nil && existingRepo != nil {
331 s.pages.Notice(w, "repo", fmt.Sprintf("A repo by this name already exists on %s", existingRepo.Knot))
332 return
333 }
334
335 secret, err := db.GetRegistrationKey(s.db, domain)
336 if err != nil {
337 s.pages.Notice(w, "repo", fmt.Sprintf("No registration key found for knot %s.", domain))
338 return
339 }
340
341 client, err := knotclient.NewSignedClient(domain, secret, s.config.Core.Dev)
342 if err != nil {
343 s.pages.Notice(w, "repo", "Failed to connect to knot server.")
344 return
345 }
346
347 rkey := tid.TID()
348 repo := &db.Repo{
349 Did: user.Did,
350 Name: repoName,
351 Knot: domain,
352 Rkey: rkey,
353 Description: description,
354 }
355
356 xrpcClient, err := s.oauth.AuthorizedClient(r)
357 if err != nil {
358 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
359 return
360 }
361
362 createdAt := time.Now().Format(time.RFC3339)
363 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{
364 Collection: tangled.RepoNSID,
365 Repo: user.Did,
366 Rkey: rkey,
367 Record: &lexutil.LexiconTypeDecoder{
368 Val: &tangled.Repo{
369 Knot: repo.Knot,
370 Name: repoName,
371 CreatedAt: createdAt,
372 Owner: user.Did,
373 }},
374 })
375 if err != nil {
376 log.Printf("failed to create record: %s", err)
377 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
378 return
379 }
380 log.Println("created repo record: ", atresp.Uri)
381
382 tx, err := s.db.BeginTx(r.Context(), nil)
383 if err != nil {
384 log.Println(err)
385 s.pages.Notice(w, "repo", "Failed to save repository information.")
386 return
387 }
388 defer func() {
389 tx.Rollback()
390 err = s.enforcer.E.LoadPolicy()
391 if err != nil {
392 log.Println("failed to rollback policies")
393 }
394 }()
395
396 resp, err := client.NewRepo(user.Did, repoName, defaultBranch)
397 if err != nil {
398 s.pages.Notice(w, "repo", "Failed to create repository on knot server.")
399 return
400 }
401
402 switch resp.StatusCode {
403 case http.StatusConflict:
404 s.pages.Notice(w, "repo", "A repository with that name already exists.")
405 return
406 case http.StatusInternalServerError:
407 s.pages.Notice(w, "repo", "Failed to create repository on knot. Try again later.")
408 case http.StatusNoContent:
409 // continue
410 }
411
412 repo.AtUri = atresp.Uri
413 err = db.AddRepo(tx, repo)
414 if err != nil {
415 log.Println(err)
416 s.pages.Notice(w, "repo", "Failed to save repository information.")
417 return
418 }
419
420 // acls
421 p, _ := securejoin.SecureJoin(user.Did, repoName)
422 err = s.enforcer.AddRepo(user.Did, domain, p)
423 if err != nil {
424 log.Println(err)
425 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
426 return
427 }
428
429 err = tx.Commit()
430 if err != nil {
431 log.Println("failed to commit changes", err)
432 http.Error(w, err.Error(), http.StatusInternalServerError)
433 return
434 }
435
436 err = s.enforcer.E.SavePolicy()
437 if err != nil {
438 log.Println("failed to update ACLs", err)
439 http.Error(w, err.Error(), http.StatusInternalServerError)
440 return
441 }
442
443 s.notifier.NewRepo(r.Context(), repo)
444
445 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName))
446 return
447 }
448}