1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log"
9 "log/slog"
10 "net/http"
11 "strings"
12 "time"
13
14 comatproto "github.com/bluesky-social/indigo/api/atproto"
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 lexutil "github.com/bluesky-social/indigo/lex/util"
17 securejoin "github.com/cyphar/filepath-securejoin"
18 "github.com/go-chi/chi/v5"
19 "github.com/posthog/posthog-go"
20 "tangled.sh/tangled.sh/core/api/tangled"
21 "tangled.sh/tangled.sh/core/appview"
22 "tangled.sh/tangled.sh/core/appview/cache"
23 "tangled.sh/tangled.sh/core/appview/cache/session"
24 "tangled.sh/tangled.sh/core/appview/config"
25 "tangled.sh/tangled.sh/core/appview/db"
26 "tangled.sh/tangled.sh/core/appview/notify"
27 "tangled.sh/tangled.sh/core/appview/oauth"
28 "tangled.sh/tangled.sh/core/appview/pages"
29 posthogService "tangled.sh/tangled.sh/core/appview/posthog"
30 "tangled.sh/tangled.sh/core/appview/reporesolver"
31 xrpcclient "tangled.sh/tangled.sh/core/appview/xrpcclient"
32 "tangled.sh/tangled.sh/core/eventconsumer"
33 "tangled.sh/tangled.sh/core/idresolver"
34 "tangled.sh/tangled.sh/core/jetstream"
35 tlog "tangled.sh/tangled.sh/core/log"
36 "tangled.sh/tangled.sh/core/rbac"
37 "tangled.sh/tangled.sh/core/tid"
38 // xrpcerr "tangled.sh/tangled.sh/core/xrpc/errors"
39)
40
41type State struct {
42 db *db.DB
43 notifier notify.Notifier
44 oauth *oauth.OAuth
45 enforcer *rbac.Enforcer
46 pages *pages.Pages
47 sess *session.SessionStore
48 idResolver *idresolver.Resolver
49 posthog posthog.Client
50 jc *jetstream.JetstreamClient
51 config *config.Config
52 repoResolver *reporesolver.RepoResolver
53 knotstream *eventconsumer.Consumer
54 spindlestream *eventconsumer.Consumer
55 logger *slog.Logger
56}
57
58func Make(ctx context.Context, config *config.Config) (*State, error) {
59 d, err := db.Make(config.Core.DbPath)
60 if err != nil {
61 return nil, fmt.Errorf("failed to create db: %w", err)
62 }
63
64 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
65 if err != nil {
66 return nil, fmt.Errorf("failed to create enforcer: %w", err)
67 }
68
69 res, err := idresolver.RedisResolver(config.Redis.ToURL())
70 if err != nil {
71 log.Printf("failed to create redis resolver: %v", err)
72 res = idresolver.DefaultResolver()
73 }
74
75 pgs := pages.NewPages(config, res)
76
77 cache := cache.New(config.Redis.Addr)
78 sess := session.New(cache)
79
80 oauth := oauth.NewOAuth(config, sess)
81
82 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
83 if err != nil {
84 return nil, fmt.Errorf("failed to create posthog client: %w", err)
85 }
86
87 repoResolver := reporesolver.New(config, enforcer, res, d)
88
89 wrapper := db.DbWrapper{d}
90 jc, err := jetstream.NewJetstreamClient(
91 config.Jetstream.Endpoint,
92 "appview",
93 []string{
94 tangled.GraphFollowNSID,
95 tangled.FeedStarNSID,
96 tangled.PublicKeyNSID,
97 tangled.RepoArtifactNSID,
98 tangled.ActorProfileNSID,
99 tangled.SpindleMemberNSID,
100 tangled.SpindleNSID,
101 tangled.StringNSID,
102 },
103 nil,
104 slog.Default(),
105 wrapper,
106 false,
107
108 // in-memory filter is inapplicalble to appview so
109 // we'll never log dids anyway.
110 false,
111 )
112 if err != nil {
113 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
114 }
115
116 ingester := appview.Ingester{
117 Db: wrapper,
118 Enforcer: enforcer,
119 IdResolver: res,
120 Config: config,
121 Logger: tlog.New("ingester"),
122 }
123 err = jc.StartJetstream(ctx, ingester.Ingest())
124 if err != nil {
125 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
126 }
127
128 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
129 if err != nil {
130 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
131 }
132 knotstream.Start(ctx)
133
134 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
135 if err != nil {
136 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
137 }
138 spindlestream.Start(ctx)
139
140 var notifiers []notify.Notifier
141 if !config.Core.Dev {
142 notifiers = append(notifiers, posthogService.NewPosthogNotifier(posthog))
143 }
144 notifier := notify.NewMergedNotifier(notifiers...)
145
146 state := &State{
147 d,
148 notifier,
149 oauth,
150 enforcer,
151 pgs,
152 sess,
153 res,
154 posthog,
155 jc,
156 config,
157 repoResolver,
158 knotstream,
159 spindlestream,
160 slog.Default(),
161 }
162
163 return state, nil
164}
165
166func (s *State) Favicon(w http.ResponseWriter, r *http.Request) {
167 w.Header().Set("Content-Type", "image/svg+xml")
168 w.Header().Set("Cache-Control", "public, max-age=31536000") // one year
169 w.Header().Set("ETag", `"favicon-svg-v1"`)
170
171 if match := r.Header.Get("If-None-Match"); match == `"favicon-svg-v1"` {
172 w.WriteHeader(http.StatusNotModified)
173 return
174 }
175
176 s.pages.Favicon(w)
177}
178
179func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
180 user := s.oauth.GetUser(r)
181 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
182 LoggedInUser: user,
183 })
184}
185
186func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
187 user := s.oauth.GetUser(r)
188 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
189 LoggedInUser: user,
190 })
191}
192
193func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
194 user := s.oauth.GetUser(r)
195
196 timeline, err := db.MakeTimeline(s.db)
197 if err != nil {
198 log.Println(err)
199 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
200 }
201
202 repos, err := db.GetTopStarredReposLastWeek(s.db)
203 if err != nil {
204 log.Println(err)
205 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
206 return
207 }
208
209 s.pages.Timeline(w, pages.TimelineParams{
210 LoggedInUser: user,
211 Timeline: timeline,
212 Repos: repos,
213 })
214}
215
216func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
217 user := chi.URLParam(r, "user")
218 user = strings.TrimPrefix(user, "@")
219
220 if user == "" {
221 w.WriteHeader(http.StatusBadRequest)
222 return
223 }
224
225 id, err := s.idResolver.ResolveIdent(r.Context(), user)
226 if err != nil {
227 w.WriteHeader(http.StatusInternalServerError)
228 return
229 }
230
231 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
232 if err != nil {
233 w.WriteHeader(http.StatusNotFound)
234 return
235 }
236
237 if len(pubKeys) == 0 {
238 w.WriteHeader(http.StatusNotFound)
239 return
240 }
241
242 for _, k := range pubKeys {
243 key := strings.TrimRight(k.Key, "\n")
244 w.Write([]byte(fmt.Sprintln(key)))
245 }
246}
247
248func validateRepoName(name string) error {
249 // check for path traversal attempts
250 if name == "." || name == ".." ||
251 strings.Contains(name, "/") || strings.Contains(name, "\\") {
252 return fmt.Errorf("Repository name contains invalid path characters")
253 }
254
255 // check for sequences that could be used for traversal when normalized
256 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
257 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
258 return fmt.Errorf("Repository name contains invalid path sequence")
259 }
260
261 // then continue with character validation
262 for _, char := range name {
263 if !((char >= 'a' && char <= 'z') ||
264 (char >= 'A' && char <= 'Z') ||
265 (char >= '0' && char <= '9') ||
266 char == '-' || char == '_' || char == '.') {
267 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
268 }
269 }
270
271 // additional check to prevent multiple sequential dots
272 if strings.Contains(name, "..") {
273 return fmt.Errorf("Repository name cannot contain sequential dots")
274 }
275
276 // if all checks pass
277 return nil
278}
279
280func stripGitExt(name string) string {
281 return strings.TrimSuffix(name, ".git")
282}
283
284func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
285 switch r.Method {
286 case http.MethodGet:
287 user := s.oauth.GetUser(r)
288 knots, err := s.enforcer.GetKnotsForUser(user.Did)
289 if err != nil {
290 s.pages.Notice(w, "repo", "Invalid user account.")
291 return
292 }
293
294 s.pages.NewRepo(w, pages.NewRepoParams{
295 LoggedInUser: user,
296 Knots: knots,
297 })
298
299 case http.MethodPost:
300 l := s.logger.With("handler", "NewRepo")
301
302 user := s.oauth.GetUser(r)
303 l = l.With("did", user.Did)
304 l = l.With("handle", user.Handle)
305
306 // form validation
307 domain := r.FormValue("domain")
308 if domain == "" {
309 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
310 return
311 }
312 l = l.With("knot", domain)
313
314 repoName := r.FormValue("name")
315 if repoName == "" {
316 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
317 return
318 }
319
320 if err := validateRepoName(repoName); err != nil {
321 s.pages.Notice(w, "repo", err.Error())
322 return
323 }
324 repoName = stripGitExt(repoName)
325 l = l.With("repoName", repoName)
326
327 defaultBranch := r.FormValue("branch")
328 if defaultBranch == "" {
329 defaultBranch = "main"
330 }
331 l = l.With("defaultBranch", defaultBranch)
332
333 description := r.FormValue("description")
334
335 // ACL validation
336 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
337 if err != nil || !ok {
338 l.Info("unauthorized")
339 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
340 return
341 }
342
343 // Check for existing repos
344 existingRepo, err := db.GetRepo(s.db, user.Did, repoName)
345 if err == nil && existingRepo != nil {
346 l.Info("repo exists")
347 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
348 return
349 }
350
351 // create atproto record for this repo
352 rkey := tid.TID()
353 repo := &db.Repo{
354 Did: user.Did,
355 Name: repoName,
356 Knot: domain,
357 Rkey: rkey,
358 Description: description,
359 }
360
361 xrpcClient, err := s.oauth.AuthorizedClient(r)
362 if err != nil {
363 l.Info("PDS write failed", "err", err)
364 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
365 return
366 }
367
368 createdAt := time.Now().Format(time.RFC3339)
369 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{
370 Collection: tangled.RepoNSID,
371 Repo: user.Did,
372 Rkey: rkey,
373 Record: &lexutil.LexiconTypeDecoder{
374 Val: &tangled.Repo{
375 Knot: repo.Knot,
376 Name: repoName,
377 CreatedAt: createdAt,
378 Owner: user.Did,
379 }},
380 })
381 if err != nil {
382 l.Info("PDS write failed", "err", err)
383 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
384 return
385 }
386
387 aturi := atresp.Uri
388 l = l.With("aturi", aturi)
389 l.Info("wrote to PDS")
390
391 tx, err := s.db.BeginTx(r.Context(), nil)
392 if err != nil {
393 l.Info("txn failed", "err", err)
394 s.pages.Notice(w, "repo", "Failed to save repository information.")
395 return
396 }
397
398 // The rollback function reverts a few things on failure:
399 // - the pending txn
400 // - the ACLs
401 // - the atproto record created
402 rollback := func() {
403 err1 := tx.Rollback()
404 err2 := s.enforcer.E.LoadPolicy()
405 err3 := rollbackRecord(context.Background(), aturi, xrpcClient)
406
407 // ignore txn complete errors, this is okay
408 if errors.Is(err1, sql.ErrTxDone) {
409 err1 = nil
410 }
411
412 if errs := errors.Join(err1, err2, err3); errs != nil {
413 l.Error("failed to rollback changes", "errs", errs)
414 return
415 }
416 }
417 defer rollback()
418
419 client, err := s.oauth.ServiceClient(
420 r,
421 oauth.WithService(domain),
422 oauth.WithLxm(tangled.RepoCreateNSID),
423 oauth.WithDev(s.config.Core.Dev),
424 )
425 if err != nil {
426 l.Error("service auth failed", "err", err)
427 s.pages.Notice(w, "repo", "Failed to reach PDS.")
428 return
429 }
430
431 xe := tangled.RepoCreate(
432 r.Context(),
433 client,
434 &tangled.RepoCreate_Input{
435 Rkey: rkey,
436 },
437 )
438 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
439 l.Error("xrpc error", "xe", xe)
440 s.pages.Notice(w, "repo", err.Error())
441 return
442 }
443
444 err = db.AddRepo(tx, repo)
445 if err != nil {
446 l.Error("db write failed", "err", err)
447 s.pages.Notice(w, "repo", "Failed to save repository information.")
448 return
449 }
450
451 // acls
452 p, _ := securejoin.SecureJoin(user.Did, repoName)
453 err = s.enforcer.AddRepo(user.Did, domain, p)
454 if err != nil {
455 l.Error("acl setup failed", "err", err)
456 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
457 return
458 }
459
460 err = tx.Commit()
461 if err != nil {
462 l.Error("txn commit failed", "err", err)
463 http.Error(w, err.Error(), http.StatusInternalServerError)
464 return
465 }
466
467 err = s.enforcer.E.SavePolicy()
468 if err != nil {
469 l.Error("acl save failed", "err", err)
470 http.Error(w, err.Error(), http.StatusInternalServerError)
471 return
472 }
473
474 // reset the ATURI because the transaction completed successfully
475 aturi = ""
476
477 s.notifier.NewRepo(r.Context(), repo)
478 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName))
479 }
480}
481
482// this is used to rollback changes made to the PDS
483//
484// it is a no-op if the provided ATURI is empty
485func rollbackRecord(ctx context.Context, aturi string, xrpcc *xrpcclient.Client) error {
486 if aturi == "" {
487 return nil
488 }
489
490 parsed := syntax.ATURI(aturi)
491
492 collection := parsed.Collection().String()
493 repo := parsed.Authority().String()
494 rkey := parsed.RecordKey().String()
495
496 _, err := xrpcc.RepoDeleteRecord(ctx, &comatproto.RepoDeleteRecord_Input{
497 Collection: collection,
498 Repo: repo,
499 Rkey: rkey,
500 })
501 return err
502}