forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
at knot-xrpc 12 kB view raw
1package state 2 3import ( 4 "context" 5 "fmt" 6 "log" 7 "log/slog" 8 "net/http" 9 "strings" 10 "time" 11 12 comatproto "github.com/bluesky-social/indigo/api/atproto" 13 lexutil "github.com/bluesky-social/indigo/lex/util" 14 securejoin "github.com/cyphar/filepath-securejoin" 15 "github.com/go-chi/chi/v5" 16 "github.com/posthog/posthog-go" 17 "tangled.sh/tangled.sh/core/api/tangled" 18 "tangled.sh/tangled.sh/core/appview" 19 "tangled.sh/tangled.sh/core/appview/cache" 20 "tangled.sh/tangled.sh/core/appview/cache/session" 21 "tangled.sh/tangled.sh/core/appview/config" 22 "tangled.sh/tangled.sh/core/appview/db" 23 "tangled.sh/tangled.sh/core/appview/notify" 24 "tangled.sh/tangled.sh/core/appview/oauth" 25 "tangled.sh/tangled.sh/core/appview/pages" 26 posthogService "tangled.sh/tangled.sh/core/appview/posthog" 27 "tangled.sh/tangled.sh/core/appview/reporesolver" 28 "tangled.sh/tangled.sh/core/eventconsumer" 29 "tangled.sh/tangled.sh/core/idresolver" 30 "tangled.sh/tangled.sh/core/jetstream" 31 tlog "tangled.sh/tangled.sh/core/log" 32 "tangled.sh/tangled.sh/core/rbac" 33 "tangled.sh/tangled.sh/core/tid" 34 xrpcerr "tangled.sh/tangled.sh/core/xrpc/errors" 35) 36 37type State struct { 38 db *db.DB 39 notifier notify.Notifier 40 oauth *oauth.OAuth 41 enforcer *rbac.Enforcer 42 pages *pages.Pages 43 sess *session.SessionStore 44 idResolver *idresolver.Resolver 45 posthog posthog.Client 46 jc *jetstream.JetstreamClient 47 config *config.Config 48 repoResolver *reporesolver.RepoResolver 49 knotstream *eventconsumer.Consumer 50 spindlestream *eventconsumer.Consumer 51} 52 53func Make(ctx context.Context, config *config.Config) (*State, error) { 54 d, err := db.Make(config.Core.DbPath) 55 if err != nil { 56 return nil, fmt.Errorf("failed to create db: %w", err) 57 } 58 59 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 60 if err != nil { 61 return nil, fmt.Errorf("failed to create enforcer: %w", err) 62 } 63 64 pgs := pages.NewPages(config) 65 66 res, err := idresolver.RedisResolver(config.Redis.ToURL()) 67 if err != nil { 68 log.Printf("failed to create redis resolver: %v", err) 69 res = idresolver.DefaultResolver() 70 } 71 72 cache := cache.New(config.Redis.Addr) 73 sess := session.New(cache) 74 75 oauth := oauth.NewOAuth(config, sess) 76 77 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 78 if err != nil { 79 return nil, fmt.Errorf("failed to create posthog client: %w", err) 80 } 81 82 repoResolver := reporesolver.New(config, enforcer, res, d) 83 84 wrapper := db.DbWrapper{d} 85 jc, err := jetstream.NewJetstreamClient( 86 config.Jetstream.Endpoint, 87 "appview", 88 []string{ 89 tangled.GraphFollowNSID, 90 tangled.FeedStarNSID, 91 tangled.PublicKeyNSID, 92 tangled.RepoArtifactNSID, 93 tangled.ActorProfileNSID, 94 tangled.SpindleMemberNSID, 95 tangled.SpindleNSID, 96 tangled.StringNSID, 97 }, 98 nil, 99 slog.Default(), 100 wrapper, 101 false, 102 103 // in-memory filter is inapplicalble to appview so 104 // we'll never log dids anyway. 105 false, 106 ) 107 if err != nil { 108 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 109 } 110 111 ingester := appview.Ingester{ 112 Db: wrapper, 113 Enforcer: enforcer, 114 IdResolver: res, 115 Config: config, 116 Logger: tlog.New("ingester"), 117 } 118 err = jc.StartJetstream(ctx, ingester.Ingest()) 119 if err != nil { 120 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 121 } 122 123 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 124 if err != nil { 125 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 126 } 127 knotstream.Start(ctx) 128 129 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 130 if err != nil { 131 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 132 } 133 spindlestream.Start(ctx) 134 135 var notifiers []notify.Notifier 136 if !config.Core.Dev { 137 notifiers = append(notifiers, posthogService.NewPosthogNotifier(posthog)) 138 } 139 notifier := notify.NewMergedNotifier(notifiers...) 140 141 state := &State{ 142 d, 143 notifier, 144 oauth, 145 enforcer, 146 pgs, 147 sess, 148 res, 149 posthog, 150 jc, 151 config, 152 repoResolver, 153 knotstream, 154 spindlestream, 155 } 156 157 return state, nil 158} 159 160func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 161 user := s.oauth.GetUser(r) 162 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 163 LoggedInUser: user, 164 }) 165} 166 167func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 168 user := s.oauth.GetUser(r) 169 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 170 LoggedInUser: user, 171 }) 172} 173 174func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 175 user := s.oauth.GetUser(r) 176 177 timeline, err := db.MakeTimeline(s.db) 178 if err != nil { 179 log.Println(err) 180 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 181 } 182 183 var didsToResolve []string 184 for _, ev := range timeline { 185 if ev.Repo != nil { 186 didsToResolve = append(didsToResolve, ev.Repo.Did) 187 if ev.Source != nil { 188 didsToResolve = append(didsToResolve, ev.Source.Did) 189 } 190 } 191 if ev.Follow != nil { 192 didsToResolve = append(didsToResolve, ev.Follow.UserDid, ev.Follow.SubjectDid) 193 } 194 if ev.Star != nil { 195 didsToResolve = append(didsToResolve, ev.Star.StarredByDid, ev.Star.Repo.Did) 196 } 197 } 198 199 resolvedIds := s.idResolver.ResolveIdents(r.Context(), didsToResolve) 200 didHandleMap := make(map[string]string) 201 for _, identity := range resolvedIds { 202 if !identity.Handle.IsInvalidHandle() { 203 didHandleMap[identity.DID.String()] = fmt.Sprintf("@%s", identity.Handle.String()) 204 } else { 205 didHandleMap[identity.DID.String()] = identity.DID.String() 206 } 207 } 208 209 s.pages.Timeline(w, pages.TimelineParams{ 210 LoggedInUser: user, 211 Timeline: timeline, 212 DidHandleMap: didHandleMap, 213 }) 214 215 return 216} 217 218func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 219 user := chi.URLParam(r, "user") 220 user = strings.TrimPrefix(user, "@") 221 222 if user == "" { 223 w.WriteHeader(http.StatusBadRequest) 224 return 225 } 226 227 id, err := s.idResolver.ResolveIdent(r.Context(), user) 228 if err != nil { 229 w.WriteHeader(http.StatusInternalServerError) 230 return 231 } 232 233 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 234 if err != nil { 235 w.WriteHeader(http.StatusNotFound) 236 return 237 } 238 239 if len(pubKeys) == 0 { 240 w.WriteHeader(http.StatusNotFound) 241 return 242 } 243 244 for _, k := range pubKeys { 245 key := strings.TrimRight(k.Key, "\n") 246 w.Write([]byte(fmt.Sprintln(key))) 247 } 248} 249 250func validateRepoName(name string) error { 251 // check for path traversal attempts 252 if name == "." || name == ".." || 253 strings.Contains(name, "/") || strings.Contains(name, "\\") { 254 return fmt.Errorf("Repository name contains invalid path characters") 255 } 256 257 // check for sequences that could be used for traversal when normalized 258 if strings.Contains(name, "./") || strings.Contains(name, "../") || 259 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 260 return fmt.Errorf("Repository name contains invalid path sequence") 261 } 262 263 // then continue with character validation 264 for _, char := range name { 265 if !((char >= 'a' && char <= 'z') || 266 (char >= 'A' && char <= 'Z') || 267 (char >= '0' && char <= '9') || 268 char == '-' || char == '_' || char == '.') { 269 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 270 } 271 } 272 273 // additional check to prevent multiple sequential dots 274 if strings.Contains(name, "..") { 275 return fmt.Errorf("Repository name cannot contain sequential dots") 276 } 277 278 // if all checks pass 279 return nil 280} 281 282func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 283 switch r.Method { 284 case http.MethodGet: 285 user := s.oauth.GetUser(r) 286 knots, err := s.enforcer.GetKnotsForUser(user.Did) 287 if err != nil { 288 s.pages.Notice(w, "repo", "Invalid user account.") 289 return 290 } 291 292 s.pages.NewRepo(w, pages.NewRepoParams{ 293 LoggedInUser: user, 294 Knots: knots, 295 }) 296 297 case http.MethodPost: 298 user := s.oauth.GetUser(r) 299 300 domain := r.FormValue("domain") 301 if domain == "" { 302 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 303 return 304 } 305 306 repoName := r.FormValue("name") 307 if repoName == "" { 308 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 309 return 310 } 311 312 if err := validateRepoName(repoName); err != nil { 313 s.pages.Notice(w, "repo", err.Error()) 314 return 315 } 316 317 defaultBranch := r.FormValue("branch") 318 if defaultBranch == "" { 319 defaultBranch = "main" 320 } 321 322 description := r.FormValue("description") 323 324 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 325 if err != nil || !ok { 326 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 327 return 328 } 329 330 existingRepo, err := db.GetRepo(s.db, user.Did, repoName) 331 if err == nil && existingRepo != nil { 332 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 333 return 334 } 335 336 client, err := s.oauth.ServiceClient( 337 r, 338 oauth.WithService(domain), 339 oauth.WithLxm(tangled.RepoCreateNSID), 340 oauth.WithDev(s.config.Core.Dev), 341 ) 342 343 if err != nil { 344 s.pages.Notice(w, "repo", "Failed to connect to knot server.") 345 return 346 } 347 348 rkey := tid.TID() 349 repo := &db.Repo{ 350 Did: user.Did, 351 Name: repoName, 352 Knot: domain, 353 Rkey: rkey, 354 Description: description, 355 } 356 357 xrpcClient, err := s.oauth.AuthorizedClient(r) 358 if err != nil { 359 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 360 return 361 } 362 363 createdAt := time.Now().Format(time.RFC3339) 364 atresp, err := xrpcClient.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{ 365 Collection: tangled.RepoNSID, 366 Repo: user.Did, 367 Rkey: rkey, 368 Record: &lexutil.LexiconTypeDecoder{ 369 Val: &tangled.Repo{ 370 Knot: repo.Knot, 371 Name: repoName, 372 CreatedAt: createdAt, 373 Owner: user.Did, 374 }}, 375 }) 376 if err != nil { 377 log.Printf("failed to create record: %s", err) 378 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 379 return 380 } 381 log.Println("created repo record: ", atresp.Uri) 382 383 tx, err := s.db.BeginTx(r.Context(), nil) 384 if err != nil { 385 log.Println(err) 386 s.pages.Notice(w, "repo", "Failed to save repository information.") 387 return 388 } 389 defer func() { 390 tx.Rollback() 391 err = s.enforcer.E.LoadPolicy() 392 if err != nil { 393 log.Println("failed to rollback policies") 394 } 395 }() 396 397 err = tangled.RepoCreate( 398 r.Context(), 399 client, 400 &tangled.RepoCreate_Input{ 401 Default_branch: &defaultBranch, 402 Did: user.Did, 403 Name: repoName, 404 }, 405 ) 406 407 if err != nil { 408 xe, err := xrpcerr.Unmarshal(err.Error()) 409 if err != nil { 410 log.Println(err) 411 s.pages.Notice(w, "repo", "Failed to create repository on knot server.") 412 return 413 } 414 415 log.Println(xe.Error()) 416 s.pages.Notice(w, "repo", fmt.Sprintf("Failed to create repository on knot server: %s.", xe.Message)) 417 return 418 } 419 420 repo.AtUri = atresp.Uri 421 err = db.AddRepo(tx, repo) 422 if err != nil { 423 log.Println(err) 424 s.pages.Notice(w, "repo", "Failed to save repository information.") 425 return 426 } 427 428 // acls 429 p, _ := securejoin.SecureJoin(user.Did, repoName) 430 err = s.enforcer.AddRepo(user.Did, domain, p) 431 if err != nil { 432 log.Println(err) 433 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 434 return 435 } 436 437 err = tx.Commit() 438 if err != nil { 439 log.Println("failed to commit changes", err) 440 http.Error(w, err.Error(), http.StatusInternalServerError) 441 return 442 } 443 444 err = s.enforcer.E.SavePolicy() 445 if err != nil { 446 log.Println("failed to update ACLs", err) 447 http.Error(w, err.Error(), http.StatusInternalServerError) 448 return 449 } 450 451 s.notifier.NewRepo(r.Context(), repo) 452 453 s.pages.HxLocation(w, fmt.Sprintf("/@%s/%s", user.Handle, repoName)) 454 return 455 } 456}