forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state 2 3import ( 4 "context" 5 "fmt" 6 "log" 7 "log/slog" 8 "net/http" 9 "strings" 10 "time" 11 12 comatproto "github.com/bluesky-social/indigo/api/atproto" 13 lexutil "github.com/bluesky-social/indigo/lex/util" 14 securejoin "github.com/cyphar/filepath-securejoin" 15 "github.com/go-chi/chi/v5" 16 "github.com/posthog/posthog-go" 17 "tangled.sh/tangled.sh/core/api/tangled" 18 "tangled.sh/tangled.sh/core/appview" 19 "tangled.sh/tangled.sh/core/appview/cache" 20 "tangled.sh/tangled.sh/core/appview/cache/session" 21 "tangled.sh/tangled.sh/core/appview/config" 22 "tangled.sh/tangled.sh/core/appview/db" 23 "tangled.sh/tangled.sh/core/appview/notify" 24 "tangled.sh/tangled.sh/core/appview/oauth" 25 "tangled.sh/tangled.sh/core/appview/pages" 26 posthogService "tangled.sh/tangled.sh/core/appview/posthog" 27 "tangled.sh/tangled.sh/core/appview/reporesolver" 28 "tangled.sh/tangled.sh/core/eventconsumer" 29 "tangled.sh/tangled.sh/core/idresolver" 30 "tangled.sh/tangled.sh/core/jetstream" 31 "tangled.sh/tangled.sh/core/knotclient" 32 tlog "tangled.sh/tangled.sh/core/log" 33 "tangled.sh/tangled.sh/core/rbac" 34 "tangled.sh/tangled.sh/core/tid" 35) 36 37type State struct { 38 db *db.DB 39 notifier notify.Notifier 40 oauth *oauth.OAuth 41 enforcer *rbac.Enforcer 42 pages *pages.Pages 43 sess *session.SessionStore 44 idResolver *idresolver.Resolver 45 posthog posthog.Client 46 jc *jetstream.JetstreamClient 47 config *config.Config 48 repoResolver *reporesolver.RepoResolver 49 knotstream *eventconsumer.Consumer 50 spindlestream *eventconsumer.Consumer 51} 52 53func Make(ctx context.Context, config *config.Config) (*State, error) { 54 d, err := db.Make(config.Core.DbPath) 55 if err != nil { 56 return nil, fmt.Errorf("failed to create db: %w", err) 57 } 58 59 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 60 if err != nil { 61 return nil, fmt.Errorf("failed to create enforcer: %w", err) 62 } 63 64 pgs := pages.NewPages(config) 65 66 res, err := idresolver.RedisResolver(config.Redis.ToURL()) 67 if err != nil { 68 log.Printf("failed to create redis resolver: %v", err) 69 res = idresolver.DefaultResolver() 70 } 71 72 cache := cache.New(config.Redis.Addr) 73 sess := session.New(cache) 74 75 oauth := oauth.NewOAuth(config, sess) 76 77 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 78 if err != nil { 79 return nil, fmt.Errorf("failed to create posthog client: %w", err) 80 } 81 82 repoResolver := reporesolver.New(config, enforcer, res, d) 83 84 wrapper := db.DbWrapper{d} 85 jc, err := jetstream.NewJetstreamClient( 86 config.Jetstream.Endpoint, 87 "appview", 88 []string{ 89 tangled.GraphFollowNSID, 90 tangled.FeedStarNSID, 91 tangled.PublicKeyNSID, 92 tangled.RepoArtifactNSID, 93 tangled.ActorProfileNSID, 94 tangled.SpindleMemberNSID, 95 tangled.SpindleNSID, 96 }, 97 nil, 98 slog.Default(), 99 wrapper, 100 false, 101 102 // in-memory filter is inapplicalble to appview so 103 // we'll never log dids anyway. 104 false, 105 ) 106 if err != nil { 107 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 108 } 109 110 ingester := appview.Ingester{ 111 Db: wrapper, 112 Enforcer: enforcer, 113 IdResolver: res, 114 Config: config, 115 Logger: tlog.New("ingester"), 116 } 117 err = jc.StartJetstream(ctx, ingester.Ingest()) 118 if err != nil { 119 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 120 } 121 122 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 123 if err != nil { 124 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 125 } 126 knotstream.Start(ctx) 127 128 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 129 if err != nil { 130 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 131 } 132 spindlestream.Start(ctx) 133 134 var notifiers []notify.Notifier 135 if !config.Core.Dev { 136 notifiers = append(notifiers, posthogService.NewPosthogNotifier(posthog)) 137 } 138 notifier := notify.NewMergedNotifier(notifiers...) 139 140 state := &State{ 141 d, 142 notifier, 143 oauth, 144 enforcer, 145 pgs, 146 sess, 147 res, 148 posthog, 149 jc, 150 config, 151 repoResolver, 152 knotstream, 153 spindlestream, 154 } 155 156 return state, nil 157} 158 159func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 160 user := s.oauth.GetUser(r) 161 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 162 LoggedInUser: user, 163 }) 164} 165 166func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 167 user := s.oauth.GetUser(r) 168 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 169 LoggedInUser: user, 170 }) 171} 172 173func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 174 user := s.oauth.GetUser(r) 175 176 timeline, err := db.MakeTimeline(s.db) 177 if err != nil { 178 log.Println(err) 179 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 180 } 181 182 var didsToResolve []string 183 for _, ev := range timeline { 184 if ev.Repo != nil { 185 didsToResolve = append(didsToResolve, ev.Repo.Did) 186 if ev.Source != nil { 187 didsToResolve = append(didsToResolve, ev.Source.Did) 188 } 189 } 190 if ev.Follow != nil { 191 didsToResolve = append(didsToResolve, ev.Follow.UserDid, ev.Follow.SubjectDid) 192 } 193 if ev.Star != nil { 194 didsToResolve = append(didsToResolve, ev.Star.StarredByDid, ev.Star.Repo.Did) 195 } 196 } 197 198 resolvedIds := s.idResolver.ResolveIdents(r.Context(), didsToResolve) 199 didHandleMap := make(map[string]string) 200 for _, identity := range resolvedIds { 201 if !identity.Handle.IsInvalidHandle() { 202 didHandleMap[identity.DID.String()] = fmt.Sprintf("@%s", identity.Handle.String()) 203 } else { 204 didHandleMap[identity.DID.String()] = identity.DID.String() 205 } 206 } 207 208 s.pages.Timeline(w, pages.TimelineParams{ 209 LoggedInUser: user, 210 Timeline: timeline, 211 DidHandleMap: didHandleMap, 212 }) 213 214 return 215} 216 217func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 218 user := chi.URLParam(r, "user") 219 user = strings.TrimPrefix(user, "@") 220 221 if user == "" { 222 w.WriteHeader(http.StatusBadRequest) 223 return 224 } 225 226 id, err := s.idResolver.ResolveIdent(r.Context(), user) 227 if err != nil { 228 w.WriteHeader(http.StatusInternalServerError) 229 return 230 } 231 232 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 233 if err != nil { 234 w.WriteHeader(http.StatusNotFound) 235 return 236 } 237 238 if len(pubKeys) == 0 { 239 w.WriteHeader(http.StatusNotFound) 240 return 241 } 242 243 for _, k := range pubKeys { 244 key := strings.TrimRight(k.Key, "\n") 245 w.Write([]byte(fmt.Sprintln(key))) 246 } 247} 248 249func validateRepoName(name string) error { 250 // check for path traversal attempts 251 if name == "." || name == ".." || 252 strings.Contains(name, "/") || strings.Contains(name, "\\") { 253 return fmt.Errorf("Repository name contains invalid path characters") 254 } 255 256 // check for sequences that could be used for traversal when normalized 257 if strings.Contains(name, "./") || strings.Contains(name, "../") || 258 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 259 return fmt.Errorf("Repository name contains invalid path sequence") 260 } 261 262 // then continue with character validation 263 for _, char := range name { 264 if !((char >= 'a' && char <= 'z') || 265 (char >= 'A' && char <= 'Z') || 266 (char >= '0' && char <= '9') || 267 char == '-' || char == '_' || char == '.') { 268 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 269 } 270 } 271 272 // additional check to prevent multiple sequential dots 273 if strings.Contains(name, "..") { 274 return fmt.Errorf("Repository name cannot contain sequential dots") 275 } 276 277 // if all checks pass 278 return nil 279} 280 281func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 282 switch r.Method { 283 case http.MethodGet: 284 user := s.oauth.GetUser(r) 285 knots, err := s.enforcer.GetKnotsForUser(user.Did) 286 if err != nil { 287 s.pages.Notice(w, "repo", "Invalid user account.") 288 return 289 } 290 291 s.pages.NewRepo(w, pages.NewRepoParams{ 292 LoggedInUser: user, 293 Knots: knots, 294 }) 295 296 case http.MethodPost: 297 user := s.oauth.GetUser(r) 298 299 domain := r.FormValue("domain") 300 if domain == "" { 301 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 302 return 303 } 304 305 repoName := r.FormValue("name") 306 if repoName == "" { 307 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 308 return 309 } 310 311 if err := validateRepoName(repoName); err != nil { 312 s.pages.Notice(w, "repo", err.Error()) 313 return 314 } 315 316 defaultBranch := r.FormValue("branch") 317 if defaultBranch == "" { 318 defaultBranch = "main" 319 } 320 321 description := r.FormValue("description") 322 323 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 324 if err != nil || !ok { 325 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 326 return 327 } 328 329 existingRepo, err := db.GetRepo(s.db, user.Did, repoName) 330 if err == nil && existingRepo != nil { 331 s.pages.Notice(w, "repo", fmt.Sprintf("A repo by this name already exists on %s", existingRepo.Knot)) 332 return 333 } 334 335 secret, err := db.GetRegistrationKey(s.db, domain) 336 if err != nil { 337 s.pages.Notice(w, "repo", fmt.Sprintf("No registration key found for knot %s.", domain)) 338 return 339 } 340 341 client, err := knotclient.NewSignedClient(domain, secret, s.config.Core.Dev) 342 if err != nil { 343 s.pages.Notice(w, "repo", "Failed to connect to knot server.") 344 return 345 } 346 347 rkey := tid.TID() 348 repo := &db.Repo{ 349 Did: user.Did, 350 Name: repoName, 351 Knot: domain, 352 Rkey: rkey, 353 Description: description, 354 } 355 356 xrpcClient, err := s.oauth.AuthorizedClient(r) 357 if err != nil { 358 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 359 return 360 } 361 362 createdAt := time.Now().Format(time.RFC3339) 363 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{ 364 Collection: tangled.RepoNSID, 365 Repo: user.Did, 366 Rkey: rkey, 367 Record: &lexutil.LexiconTypeDecoder{ 368 Val: &tangled.Repo{ 369 Knot: repo.Knot, 370 Name: repoName, 371 CreatedAt: createdAt, 372 Owner: user.Did, 373 }}, 374 }) 375 if err != nil { 376 log.Printf("failed to create record: %s", err) 377 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 378 return 379 } 380 log.Println("created repo record: ", atresp.Uri) 381 382 tx, err := s.db.BeginTx(r.Context(), nil) 383 if err != nil { 384 log.Println(err) 385 s.pages.Notice(w, "repo", "Failed to save repository information.") 386 return 387 } 388 defer func() { 389 tx.Rollback() 390 err = s.enforcer.E.LoadPolicy() 391 if err != nil { 392 log.Println("failed to rollback policies") 393 } 394 }() 395 396 resp, err := client.NewRepo(user.Did, repoName, defaultBranch) 397 if err != nil { 398 s.pages.Notice(w, "repo", "Failed to create repository on knot server.") 399 return 400 } 401 402 switch resp.StatusCode { 403 case http.StatusConflict: 404 s.pages.Notice(w, "repo", "A repository with that name already exists.") 405 return 406 case http.StatusInternalServerError: 407 s.pages.Notice(w, "repo", "Failed to create repository on knot. Try again later.") 408 case http.StatusNoContent: 409 // continue 410 } 411 412 repo.AtUri = atresp.Uri 413 err = db.AddRepo(tx, repo) 414 if err != nil { 415 log.Println(err) 416 s.pages.Notice(w, "repo", "Failed to save repository information.") 417 return 418 } 419 420 // acls 421 p, _ := securejoin.SecureJoin(user.Did, repoName) 422 err = s.enforcer.AddRepo(user.Did, domain, p) 423 if err != nil { 424 log.Println(err) 425 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 426 return 427 } 428 429 err = tx.Commit() 430 if err != nil { 431 log.Println("failed to commit changes", err) 432 http.Error(w, err.Error(), http.StatusInternalServerError) 433 return 434 } 435 436 err = s.enforcer.E.SavePolicy() 437 if err != nil { 438 log.Println("failed to update ACLs", err) 439 http.Error(w, err.Error(), http.StatusInternalServerError) 440 return 441 } 442 443 s.notifier.NewRepo(r.Context(), repo) 444 445 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName)) 446 return 447 } 448}