forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package knots 2 3import ( 4 "errors" 5 "fmt" 6 "log/slog" 7 "net/http" 8 "slices" 9 "strings" 10 "time" 11 12 "github.com/go-chi/chi/v5" 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview/config" 15 "tangled.org/core/appview/db" 16 "tangled.org/core/appview/middleware" 17 "tangled.org/core/appview/models" 18 "tangled.org/core/appview/oauth" 19 "tangled.org/core/appview/pages" 20 "tangled.org/core/appview/serververify" 21 "tangled.org/core/appview/xrpcclient" 22 "tangled.org/core/eventconsumer" 23 "tangled.org/core/idresolver" 24 "tangled.org/core/rbac" 25 "tangled.org/core/tid" 26 27 comatproto "github.com/bluesky-social/indigo/api/atproto" 28 lexutil "github.com/bluesky-social/indigo/lex/util" 29) 30 31type Knots struct { 32 Db *db.DB 33 OAuth *oauth.OAuth 34 Pages *pages.Pages 35 Config *config.Config 36 Enforcer *rbac.Enforcer 37 IdResolver *idresolver.Resolver 38 Logger *slog.Logger 39 Knotstream *eventconsumer.Consumer 40} 41 42func (k *Knots) Router() http.Handler { 43 r := chi.NewRouter() 44 45 r.With(middleware.AuthMiddleware(k.OAuth)).Get("/", k.knots) 46 r.With(middleware.AuthMiddleware(k.OAuth)).Post("/register", k.register) 47 48 r.With(middleware.AuthMiddleware(k.OAuth)).Get("/{domain}", k.dashboard) 49 r.With(middleware.AuthMiddleware(k.OAuth)).Delete("/{domain}", k.delete) 50 51 r.With(middleware.AuthMiddleware(k.OAuth)).Post("/{domain}/retry", k.retry) 52 r.With(middleware.AuthMiddleware(k.OAuth)).Post("/{domain}/add", k.addMember) 53 r.With(middleware.AuthMiddleware(k.OAuth)).Post("/{domain}/remove", k.removeMember) 54 55 return r 56} 57 58func (k *Knots) knots(w http.ResponseWriter, r *http.Request) { 59 user := k.OAuth.GetUser(r) 60 registrations, err := db.GetRegistrations( 61 k.Db, 62 db.FilterEq("did", user.Did), 63 ) 64 if err != nil { 65 k.Logger.Error("failed to fetch knot registrations", "err", err) 66 w.WriteHeader(http.StatusInternalServerError) 67 return 68 } 69 70 k.Pages.Knots(w, pages.KnotsParams{ 71 LoggedInUser: user, 72 Registrations: registrations, 73 }) 74} 75 76func (k *Knots) dashboard(w http.ResponseWriter, r *http.Request) { 77 l := k.Logger.With("handler", "dashboard") 78 79 user := k.OAuth.GetUser(r) 80 l = l.With("user", user.Did) 81 82 domain := chi.URLParam(r, "domain") 83 if domain == "" { 84 return 85 } 86 l = l.With("domain", domain) 87 88 registrations, err := db.GetRegistrations( 89 k.Db, 90 db.FilterEq("did", user.Did), 91 db.FilterEq("domain", domain), 92 ) 93 if err != nil { 94 l.Error("failed to get registrations", "err", err) 95 http.Error(w, "Not found", http.StatusNotFound) 96 return 97 } 98 if len(registrations) != 1 { 99 l.Error("got incorret number of registrations", "got", len(registrations), "expected", 1) 100 return 101 } 102 registration := registrations[0] 103 104 members, err := k.Enforcer.GetUserByRole("server:member", domain) 105 if err != nil { 106 l.Error("failed to get knot members", "err", err) 107 http.Error(w, "Not found", http.StatusInternalServerError) 108 return 109 } 110 slices.Sort(members) 111 112 repos, err := db.GetRepos( 113 k.Db, 114 0, 115 db.FilterEq("knot", domain), 116 ) 117 if err != nil { 118 l.Error("failed to get knot repos", "err", err) 119 http.Error(w, "Not found", http.StatusInternalServerError) 120 return 121 } 122 123 // organize repos by did 124 repoMap := make(map[string][]models.Repo) 125 for _, r := range repos { 126 repoMap[r.Did] = append(repoMap[r.Did], r) 127 } 128 129 k.Pages.Knot(w, pages.KnotParams{ 130 LoggedInUser: user, 131 Registration: &registration, 132 Members: members, 133 Repos: repoMap, 134 IsOwner: true, 135 }) 136} 137 138func (k *Knots) register(w http.ResponseWriter, r *http.Request) { 139 user := k.OAuth.GetUser(r) 140 l := k.Logger.With("handler", "register") 141 142 noticeId := "register-error" 143 defaultErr := "Failed to register knot. Try again later." 144 fail := func() { 145 k.Pages.Notice(w, noticeId, defaultErr) 146 } 147 148 domain := r.FormValue("domain") 149 // Strip protocol, trailing slashes, and whitespace 150 // Rkey cannot contain slashes 151 domain = strings.TrimSpace(domain) 152 domain = strings.TrimPrefix(domain, "https://") 153 domain = strings.TrimPrefix(domain, "http://") 154 domain = strings.TrimSuffix(domain, "/") 155 if domain == "" { 156 k.Pages.Notice(w, noticeId, "Incomplete form.") 157 return 158 } 159 l = l.With("domain", domain) 160 l = l.With("user", user.Did) 161 162 tx, err := k.Db.Begin() 163 if err != nil { 164 l.Error("failed to start transaction", "err", err) 165 fail() 166 return 167 } 168 defer func() { 169 tx.Rollback() 170 k.Enforcer.E.LoadPolicy() 171 }() 172 173 err = db.AddKnot(tx, domain, user.Did) 174 if err != nil { 175 l.Error("failed to insert", "err", err) 176 fail() 177 return 178 } 179 180 err = k.Enforcer.AddKnot(domain) 181 if err != nil { 182 l.Error("failed to create knot", "err", err) 183 fail() 184 return 185 } 186 187 // create record on pds 188 client, err := k.OAuth.AuthorizedClient(r) 189 if err != nil { 190 l.Error("failed to authorize client", "err", err) 191 fail() 192 return 193 } 194 195 ex, _ := comatproto.RepoGetRecord(r.Context(), client, "", tangled.KnotNSID, user.Did, domain) 196 var exCid *string 197 if ex != nil { 198 exCid = ex.Cid 199 } 200 201 // re-announce by registering under same rkey 202 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 203 Collection: tangled.KnotNSID, 204 Repo: user.Did, 205 Rkey: domain, 206 Record: &lexutil.LexiconTypeDecoder{ 207 Val: &tangled.Knot{ 208 CreatedAt: time.Now().Format(time.RFC3339), 209 }, 210 }, 211 SwapRecord: exCid, 212 }) 213 214 if err != nil { 215 l.Error("failed to put record", "err", err) 216 fail() 217 return 218 } 219 220 err = tx.Commit() 221 if err != nil { 222 l.Error("failed to commit transaction", "err", err) 223 fail() 224 return 225 } 226 227 err = k.Enforcer.E.SavePolicy() 228 if err != nil { 229 l.Error("failed to update ACL", "err", err) 230 k.Pages.HxRefresh(w) 231 return 232 } 233 234 // begin verification 235 err = serververify.RunVerification(r.Context(), domain, user.Did, k.Config.Core.Dev) 236 if err != nil { 237 l.Error("verification failed", "err", err) 238 k.Pages.HxRefresh(w) 239 return 240 } 241 242 err = serververify.MarkKnotVerified(k.Db, k.Enforcer, domain, user.Did) 243 if err != nil { 244 l.Error("failed to mark verified", "err", err) 245 k.Pages.HxRefresh(w) 246 return 247 } 248 249 // add this knot to knotstream 250 go k.Knotstream.AddSource( 251 r.Context(), 252 eventconsumer.NewKnotSource(domain), 253 ) 254 255 // ok 256 k.Pages.HxRefresh(w) 257} 258 259func (k *Knots) delete(w http.ResponseWriter, r *http.Request) { 260 user := k.OAuth.GetUser(r) 261 l := k.Logger.With("handler", "delete") 262 263 noticeId := "operation-error" 264 defaultErr := "Failed to delete knot. Try again later." 265 fail := func() { 266 k.Pages.Notice(w, noticeId, defaultErr) 267 } 268 269 domain := chi.URLParam(r, "domain") 270 if domain == "" { 271 l.Error("empty domain") 272 fail() 273 return 274 } 275 276 // get record from db first 277 registrations, err := db.GetRegistrations( 278 k.Db, 279 db.FilterEq("did", user.Did), 280 db.FilterEq("domain", domain), 281 ) 282 if err != nil { 283 l.Error("failed to get registration", "err", err) 284 fail() 285 return 286 } 287 if len(registrations) != 1 { 288 l.Error("got incorret number of registrations", "got", len(registrations), "expected", 1) 289 fail() 290 return 291 } 292 registration := registrations[0] 293 294 tx, err := k.Db.Begin() 295 if err != nil { 296 l.Error("failed to start txn", "err", err) 297 fail() 298 return 299 } 300 defer func() { 301 tx.Rollback() 302 k.Enforcer.E.LoadPolicy() 303 }() 304 305 err = db.DeleteKnot( 306 tx, 307 db.FilterEq("did", user.Did), 308 db.FilterEq("domain", domain), 309 ) 310 if err != nil { 311 l.Error("failed to delete registration", "err", err) 312 fail() 313 return 314 } 315 316 // delete from enforcer if it was registered 317 if registration.Registered != nil { 318 err = k.Enforcer.RemoveKnot(domain) 319 if err != nil { 320 l.Error("failed to update ACL", "err", err) 321 fail() 322 return 323 } 324 } 325 326 client, err := k.OAuth.AuthorizedClient(r) 327 if err != nil { 328 l.Error("failed to authorize client", "err", err) 329 fail() 330 return 331 } 332 333 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 334 Collection: tangled.KnotNSID, 335 Repo: user.Did, 336 Rkey: domain, 337 }) 338 if err != nil { 339 // non-fatal 340 l.Error("failed to delete record", "err", err) 341 } 342 343 err = tx.Commit() 344 if err != nil { 345 l.Error("failed to delete knot", "err", err) 346 fail() 347 return 348 } 349 350 err = k.Enforcer.E.SavePolicy() 351 if err != nil { 352 l.Error("failed to update ACL", "err", err) 353 k.Pages.HxRefresh(w) 354 return 355 } 356 357 shouldRedirect := r.Header.Get("shouldRedirect") 358 if shouldRedirect == "true" { 359 k.Pages.HxRedirect(w, "/knots") 360 return 361 } 362 363 w.Write([]byte{}) 364} 365 366func (k *Knots) retry(w http.ResponseWriter, r *http.Request) { 367 user := k.OAuth.GetUser(r) 368 l := k.Logger.With("handler", "retry") 369 370 noticeId := "operation-error" 371 defaultErr := "Failed to verify knot. Try again later." 372 fail := func() { 373 k.Pages.Notice(w, noticeId, defaultErr) 374 } 375 376 domain := chi.URLParam(r, "domain") 377 if domain == "" { 378 l.Error("empty domain") 379 fail() 380 return 381 } 382 l = l.With("domain", domain) 383 l = l.With("user", user.Did) 384 385 // get record from db first 386 registrations, err := db.GetRegistrations( 387 k.Db, 388 db.FilterEq("did", user.Did), 389 db.FilterEq("domain", domain), 390 ) 391 if err != nil { 392 l.Error("failed to get registration", "err", err) 393 fail() 394 return 395 } 396 if len(registrations) != 1 { 397 l.Error("got incorret number of registrations", "got", len(registrations), "expected", 1) 398 fail() 399 return 400 } 401 registration := registrations[0] 402 403 // begin verification 404 err = serververify.RunVerification(r.Context(), domain, user.Did, k.Config.Core.Dev) 405 if err != nil { 406 l.Error("verification failed", "err", err) 407 408 if errors.Is(err, xrpcclient.ErrXrpcUnsupported) { 409 k.Pages.Notice(w, noticeId, "Failed to verify knot, XRPC queries are unsupported on this knot, consider upgrading!") 410 return 411 } 412 413 if e, ok := err.(*serververify.OwnerMismatch); ok { 414 k.Pages.Notice(w, noticeId, e.Error()) 415 return 416 } 417 418 fail() 419 return 420 } 421 422 err = serververify.MarkKnotVerified(k.Db, k.Enforcer, domain, user.Did) 423 if err != nil { 424 l.Error("failed to mark verified", "err", err) 425 k.Pages.Notice(w, noticeId, err.Error()) 426 return 427 } 428 429 // if this knot requires upgrade, then emit a record too 430 // 431 // this is part of migrating from the old knot system to the new one 432 if registration.NeedsUpgrade { 433 // re-announce by registering under same rkey 434 client, err := k.OAuth.AuthorizedClient(r) 435 if err != nil { 436 l.Error("failed to authorize client", "err", err) 437 fail() 438 return 439 } 440 441 ex, _ := comatproto.RepoGetRecord(r.Context(), client, "", tangled.KnotNSID, user.Did, domain) 442 var exCid *string 443 if ex != nil { 444 exCid = ex.Cid 445 } 446 447 // ignore the error here 448 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 449 Collection: tangled.KnotNSID, 450 Repo: user.Did, 451 Rkey: domain, 452 Record: &lexutil.LexiconTypeDecoder{ 453 Val: &tangled.Knot{ 454 CreatedAt: time.Now().Format(time.RFC3339), 455 }, 456 }, 457 SwapRecord: exCid, 458 }) 459 if err != nil { 460 l.Error("non-fatal: failed to reannouce knot", "err", err) 461 } 462 } 463 464 // add this knot to knotstream 465 go k.Knotstream.AddSource( 466 r.Context(), 467 eventconsumer.NewKnotSource(domain), 468 ) 469 470 shouldRefresh := r.Header.Get("shouldRefresh") 471 if shouldRefresh == "true" { 472 k.Pages.HxRefresh(w) 473 return 474 } 475 476 // Get updated registration to show 477 registrations, err = db.GetRegistrations( 478 k.Db, 479 db.FilterEq("did", user.Did), 480 db.FilterEq("domain", domain), 481 ) 482 if err != nil { 483 l.Error("failed to get registration", "err", err) 484 fail() 485 return 486 } 487 if len(registrations) != 1 { 488 l.Error("got incorret number of registrations", "got", len(registrations), "expected", 1) 489 fail() 490 return 491 } 492 updatedRegistration := registrations[0] 493 494 w.Header().Set("HX-Reswap", "outerHTML") 495 k.Pages.KnotListing(w, pages.KnotListingParams{ 496 Registration: &updatedRegistration, 497 }) 498} 499 500func (k *Knots) addMember(w http.ResponseWriter, r *http.Request) { 501 user := k.OAuth.GetUser(r) 502 l := k.Logger.With("handler", "addMember") 503 504 domain := chi.URLParam(r, "domain") 505 if domain == "" { 506 l.Error("empty domain") 507 http.Error(w, "Not found", http.StatusNotFound) 508 return 509 } 510 l = l.With("domain", domain) 511 l = l.With("user", user.Did) 512 513 registrations, err := db.GetRegistrations( 514 k.Db, 515 db.FilterEq("did", user.Did), 516 db.FilterEq("domain", domain), 517 db.FilterIsNot("registered", "null"), 518 ) 519 if err != nil { 520 l.Error("failed to get registration", "err", err) 521 return 522 } 523 if len(registrations) != 1 { 524 l.Error("got incorret number of registrations", "got", len(registrations), "expected", 1) 525 return 526 } 527 registration := registrations[0] 528 529 noticeId := fmt.Sprintf("add-member-error-%d", registration.Id) 530 defaultErr := "Failed to add member. Try again later." 531 fail := func() { 532 k.Pages.Notice(w, noticeId, defaultErr) 533 } 534 535 member := r.FormValue("member") 536 member = strings.TrimPrefix(member, "@") 537 if member == "" { 538 l.Error("empty member") 539 k.Pages.Notice(w, noticeId, "Failed to add member, empty form.") 540 return 541 } 542 l = l.With("member", member) 543 544 memberId, err := k.IdResolver.ResolveIdent(r.Context(), member) 545 if err != nil { 546 l.Error("failed to resolve member identity to handle", "err", err) 547 k.Pages.Notice(w, noticeId, "Failed to add member, identity resolution failed.") 548 return 549 } 550 if memberId.Handle.IsInvalidHandle() { 551 l.Error("failed to resolve member identity to handle") 552 k.Pages.Notice(w, noticeId, "Failed to add member, identity resolution failed.") 553 return 554 } 555 556 // write to pds 557 client, err := k.OAuth.AuthorizedClient(r) 558 if err != nil { 559 l.Error("failed to authorize client", "err", err) 560 fail() 561 return 562 } 563 564 rkey := tid.TID() 565 566 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 567 Collection: tangled.KnotMemberNSID, 568 Repo: user.Did, 569 Rkey: rkey, 570 Record: &lexutil.LexiconTypeDecoder{ 571 Val: &tangled.KnotMember{ 572 CreatedAt: time.Now().Format(time.RFC3339), 573 Domain: domain, 574 Subject: memberId.DID.String(), 575 }, 576 }, 577 }) 578 if err != nil { 579 l.Error("failed to add record to PDS", "err", err) 580 k.Pages.Notice(w, noticeId, "Failed to add record to PDS, try again later.") 581 return 582 } 583 584 err = k.Enforcer.AddKnotMember(domain, memberId.DID.String()) 585 if err != nil { 586 l.Error("failed to add member to ACLs", "err", err) 587 fail() 588 return 589 } 590 591 err = k.Enforcer.E.SavePolicy() 592 if err != nil { 593 l.Error("failed to save ACL policy", "err", err) 594 fail() 595 return 596 } 597 598 // success 599 k.Pages.HxRedirect(w, fmt.Sprintf("/knots/%s", domain)) 600} 601 602func (k *Knots) removeMember(w http.ResponseWriter, r *http.Request) { 603 user := k.OAuth.GetUser(r) 604 l := k.Logger.With("handler", "removeMember") 605 606 noticeId := "operation-error" 607 defaultErr := "Failed to remove member. Try again later." 608 fail := func() { 609 k.Pages.Notice(w, noticeId, defaultErr) 610 } 611 612 domain := chi.URLParam(r, "domain") 613 if domain == "" { 614 l.Error("empty domain") 615 fail() 616 return 617 } 618 l = l.With("domain", domain) 619 l = l.With("user", user.Did) 620 621 registrations, err := db.GetRegistrations( 622 k.Db, 623 db.FilterEq("did", user.Did), 624 db.FilterEq("domain", domain), 625 db.FilterIsNot("registered", "null"), 626 ) 627 if err != nil { 628 l.Error("failed to get registration", "err", err) 629 return 630 } 631 if len(registrations) != 1 { 632 l.Error("got incorret number of registrations", "got", len(registrations), "expected", 1) 633 return 634 } 635 636 member := r.FormValue("member") 637 member = strings.TrimPrefix(member, "@") 638 if member == "" { 639 l.Error("empty member") 640 k.Pages.Notice(w, noticeId, "Failed to remove member, empty form.") 641 return 642 } 643 l = l.With("member", member) 644 645 memberId, err := k.IdResolver.ResolveIdent(r.Context(), member) 646 if err != nil { 647 l.Error("failed to resolve member identity to handle", "err", err) 648 k.Pages.Notice(w, noticeId, "Failed to remove member, identity resolution failed.") 649 return 650 } 651 if memberId.Handle.IsInvalidHandle() { 652 l.Error("failed to resolve member identity to handle") 653 k.Pages.Notice(w, noticeId, "Failed to remove member, identity resolution failed.") 654 return 655 } 656 657 // remove from enforcer 658 err = k.Enforcer.RemoveKnotMember(domain, memberId.DID.String()) 659 if err != nil { 660 l.Error("failed to update ACLs", "err", err) 661 fail() 662 return 663 } 664 665 client, err := k.OAuth.AuthorizedClient(r) 666 if err != nil { 667 l.Error("failed to authorize client", "err", err) 668 fail() 669 return 670 } 671 672 // TODO: We need to track the rkey for knot members to delete the record 673 // For now, just remove from ACLs 674 _ = client 675 676 // commit everything 677 err = k.Enforcer.E.SavePolicy() 678 if err != nil { 679 l.Error("failed to save ACLs", "err", err) 680 fail() 681 return 682 } 683 684 // ok 685 k.Pages.HxRefresh(w) 686}