An atproto PDS written in Go

Compare changes

Choose any two refs to compare.

+5 -5
README.md
···
### Identity
-
- [ ] `com.atproto.identity.getRecommendedDidCredentials`
-
- [ ] `com.atproto.identity.requestPlcOperationSignature`
+
- [x] `com.atproto.identity.getRecommendedDidCredentials`
+
- [x] `com.atproto.identity.requestPlcOperationSignature`
- [x] `com.atproto.identity.resolveHandle`
-
- [ ] `com.atproto.identity.signPlcOperation`
-
- [ ] `com.atproto.identity.submitPlcOperation`
+
- [x] `com.atproto.identity.signPlcOperation`
+
- [x] `com.atproto.identity.submitPlcOperation`
- [x] `com.atproto.identity.updateHandle`
### Repo
···
- [x] `com.atproto.repo.deleteRecord`
- [x] `com.atproto.repo.describeRepo`
- [x] `com.atproto.repo.getRecord`
-
- [x] `com.atproto.repo.importRepo` (Works "okay". You still have to handle PLC operations on your own when migrating. Use with extreme caution.)
+
- [x] `com.atproto.repo.importRepo` (Works "okay". Use with extreme caution.)
- [x] `com.atproto.repo.listRecords`
- [ ] `com.atproto.repo.listMissingBlobs`
+2
models/models.go
···
EmailUpdateCodeExpiresAt *time.Time
PasswordResetCode *string
PasswordResetCodeExpiresAt *time.Time
+
PlcOperationCode *string
+
PlcOperationCodeExpiresAt *time.Time
Password string
SigningKey []byte
Rev string
+44 -28
oauth/client/manager.go
···
cli *http.Client
logger *slog.Logger
jwksCache cache.Cache[string, jwk.Key]
-
metadataCache cache.Cache[string, Metadata]
+
metadataCache cache.Cache[string, *Metadata]
}
type ManagerArgs struct {
···
}
jwksCache := cache.NewCache[string, jwk.Key]().WithLRU().WithMaxKeys(500).WithTTL(5 * time.Minute)
-
metadataCache := cache.NewCache[string, Metadata]().WithLRU().WithMaxKeys(500).WithTTL(5 * time.Minute)
+
metadataCache := cache.NewCache[string, *Metadata]().WithLRU().WithMaxKeys(500).WithTTL(5 * time.Minute)
return &Manager{
cli: args.Cli,
···
}
var jwks jwk.Key
-
if metadata.JWKS != nil && len(metadata.JWKS.Keys) > 0 {
-
// TODO: this is kinda bad but whatever for now. there could obviously be more than one jwk, and we need to
-
// make sure we use the right one
-
b, err := json.Marshal(metadata.JWKS.Keys[0])
-
if err != nil {
-
return nil, err
-
}
+
if metadata.TokenEndpointAuthMethod == "private_key_jwt" {
+
if metadata.JWKS != nil && len(metadata.JWKS.Keys) > 0 {
+
// TODO: this is kinda bad but whatever for now. there could obviously be more than one jwk, and we need to
+
// make sure we use the right one
+
b, err := json.Marshal(metadata.JWKS.Keys[0])
+
if err != nil {
+
return nil, err
+
}
-
k, err := helpers.ParseJWKFromBytes(b)
-
if err != nil {
-
return nil, err
-
}
+
k, err := helpers.ParseJWKFromBytes(b)
+
if err != nil {
+
return nil, err
+
}
-
jwks = k
-
} else if metadata.JWKSURI != nil {
-
maybeJwks, err := cm.getClientJwks(ctx, clientId, *metadata.JWKSURI)
-
if err != nil {
-
return nil, err
-
}
+
jwks = k
+
} else if metadata.JWKS != nil {
+
} else if metadata.JWKSURI != nil {
+
maybeJwks, err := cm.getClientJwks(ctx, clientId, *metadata.JWKSURI)
+
if err != nil {
+
return nil, err
+
}
-
jwks = maybeJwks
-
} else {
-
return nil, fmt.Errorf("no valid jwks found in oauth client metadata")
+
jwks = maybeJwks
+
} else {
+
return nil, fmt.Errorf("no valid jwks found in oauth client metadata")
+
}
}
return &Client{
···
}
func (cm *Manager) getClientMetadata(ctx context.Context, clientId string) (*Metadata, error) {
-
metadataCached, ok := cm.metadataCache.Get(clientId)
+
cached, ok := cm.metadataCache.Get(clientId)
if !ok {
req, err := http.NewRequestWithContext(ctx, "GET", clientId, nil)
if err != nil {
···
return nil, err
}
+
cm.metadataCache.Set(clientId, validated, 10*time.Minute)
+
return validated, nil
} else {
-
return &metadataCached, nil
+
return cached, nil
}
}
···
return nil, fmt.Errorf("error unmarshaling metadata: %w", err)
}
+
if metadata.ClientURI == "" {
+
u, err := url.Parse(metadata.ClientID)
+
if err != nil {
+
return nil, fmt.Errorf("unable to parse client id: %w", err)
+
}
+
u.RawPath = ""
+
u.RawQuery = ""
+
metadata.ClientURI = u.String()
+
}
+
u, err := url.Parse(metadata.ClientURI)
if err != nil {
return nil, fmt.Errorf("unable to parse client uri: %w", err)
}
+
if metadata.ClientName == "" {
+
metadata.ClientName = metadata.ClientURI
+
}
+
if isLocalHostname(u.Hostname()) {
-
return nil, errors.New("`client_uri` hostname is invalid")
+
return nil, fmt.Errorf("`client_uri` hostname is invalid: %s", u.Hostname())
}
if metadata.Scope == "" {
···
if u.Scheme != "http" {
return nil, fmt.Errorf("loopback redirect uri %s must use http", ruri)
}
-
-
break
case u.Scheme == "http":
return nil, errors.New("only loopbvack redirect uris are allowed to use the `http` scheme")
case u.Scheme == "https":
if isLocalHostname(u.Hostname()) {
return nil, fmt.Errorf("redirect uri %s's domain must not be a local hostname", ruri)
}
-
break
case strings.Contains(u.Scheme, "."):
if metadata.ApplicationType != "native" {
return nil, errors.New("private-use uri scheme redirect uris are only allowed for native apps")
+31 -15
plc/client.go
···
}
func (c *Client) CreateDID(sigkey *atcrypto.PrivateKeyK256, recovery string, handle string) (string, *Operation, error) {
-
pubsigkey, err := sigkey.PublicKey()
+
creds, err := c.CreateDidCredentials(sigkey, recovery, handle)
if err != nil {
return "", nil, err
}
-
pubrotkey, err := c.rotationKey.PublicKey()
+
op := Operation{
+
Type: "plc_operation",
+
VerificationMethods: creds.VerificationMethods,
+
RotationKeys: creds.RotationKeys,
+
AlsoKnownAs: creds.AlsoKnownAs,
+
Services: creds.Services,
+
Prev: nil,
+
}
+
+
if err := c.SignOp(sigkey, &op); err != nil {
+
return "", nil, err
+
}
+
+
did, err := DidFromOp(&op)
if err != nil {
return "", nil, err
}
+
return did, &op, nil
+
}
+
+
func (c *Client) CreateDidCredentials(sigkey *atcrypto.PrivateKeyK256, recovery string, handle string) (*DidCredentials, error) {
+
pubsigkey, err := sigkey.PublicKey()
+
if err != nil {
+
return nil, err
+
}
+
+
pubrotkey, err := c.rotationKey.PublicKey()
+
if err != nil {
+
return nil, err
+
}
+
// todo
rotationKeys := []string{pubrotkey.DIDKey()}
if recovery != "" {
···
}(recovery)
}
-
op := Operation{
-
Type: "plc_operation",
+
creds := DidCredentials{
VerificationMethods: map[string]string{
"atproto": pubsigkey.DIDKey(),
},
···
Endpoint: "https://" + c.pdsHostname,
},
},
-
Prev: nil,
}
-
if err := c.SignOp(sigkey, &op); err != nil {
-
return "", nil, err
-
}
-
-
did, err := DidFromOp(&op)
-
if err != nil {
-
return "", nil, err
-
}
-
-
return did, &op, nil
+
return &creds, nil
}
func (c *Client) SignOp(sigkey *atcrypto.PrivateKeyK256, op *Operation) error {
+8
plc/types.go
···
cbg "github.com/whyrusleeping/cbor-gen"
)
+
+
type DidCredentials struct {
+
VerificationMethods map[string]string `json:"verificationMethods"`
+
RotationKeys []string `json:"rotationKeys"`
+
AlsoKnownAs []string `json:"alsoKnownAs"`
+
Services map[string]identity.OperationService `json:"services"`
+
}
+
type Operation struct {
Type string `json:"type"`
VerificationMethods map[string]string `json:"verificationMethods"`
+1
server/handle_account.go
···
func (s *Server) handleAccount(e echo.Context) error {
ctx := e.Request().Context()
+
repo, sess, err := s.getSessionRepoOrErr(e)
if err != nil {
return e.Redirect(303, "/account/signin")
+29
server/handle_identity_request_plc_operation.go
···
+
package server
+
+
import (
+
"fmt"
+
"time"
+
+
"github.com/haileyok/cocoon/internal/helpers"
+
"github.com/haileyok/cocoon/models"
+
"github.com/labstack/echo/v4"
+
)
+
+
func (s *Server) handleIdentityRequestPlcOperationSignature(e echo.Context) error {
+
urepo := e.Get("repo").(*models.RepoActor)
+
+
code := fmt.Sprintf("%s-%s", helpers.RandomVarchar(5), helpers.RandomVarchar(5))
+
eat := time.Now().Add(10 * time.Minute).UTC()
+
+
if err := s.db.Exec("UPDATE repos SET plc_operation_code = ?, plc_operation_code_expires_at = ? WHERE did = ?", nil, code, eat, urepo.Repo.Did).Error; err != nil {
+
s.logger.Error("error updating user", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
if err := s.sendPlcTokenReset(urepo.Email, urepo.Handle, code); err != nil {
+
s.logger.Error("error sending mail", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
return e.NoContent(200)
+
}
+103
server/handle_identity_sign_plc_operation.go
···
+
package server
+
+
import (
+
"context"
+
"strings"
+
"time"
+
+
"github.com/Azure/go-autorest/autorest/to"
+
"github.com/bluesky-social/indigo/atproto/atcrypto"
+
"github.com/haileyok/cocoon/identity"
+
"github.com/haileyok/cocoon/internal/helpers"
+
"github.com/haileyok/cocoon/models"
+
"github.com/haileyok/cocoon/plc"
+
"github.com/labstack/echo/v4"
+
)
+
+
type ComAtprotoSignPlcOperationRequest struct {
+
Token string `json:"token"`
+
VerificationMethods *map[string]string `json:"verificationMethods"`
+
RotationKeys *[]string `json:"rotationKeys"`
+
AlsoKnownAs *[]string `json:"alsoKnownAs"`
+
Services *map[string]identity.OperationService `json:"services"`
+
}
+
+
type ComAtprotoSignPlcOperationResponse struct {
+
Operation plc.Operation `json:"operation"`
+
}
+
+
func (s *Server) handleSignPlcOperation(e echo.Context) error {
+
repo := e.Get("repo").(*models.RepoActor)
+
+
var req ComAtprotoSignPlcOperationRequest
+
if err := e.Bind(&req); err != nil {
+
s.logger.Error("error binding", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
if !strings.HasPrefix(repo.Repo.Did, "did:plc:") {
+
return helpers.InputError(e, nil)
+
}
+
+
if repo.PlcOperationCode == nil || repo.PlcOperationCodeExpiresAt == nil {
+
return helpers.InputError(e, to.StringPtr("InvalidToken"))
+
}
+
+
if *repo.PlcOperationCode != req.Token {
+
return helpers.InvalidTokenError(e)
+
}
+
+
if time.Now().UTC().After(*repo.PlcOperationCodeExpiresAt) {
+
return helpers.ExpiredTokenError(e)
+
}
+
+
ctx := context.WithValue(e.Request().Context(), "skip-cache", true)
+
log, err := identity.FetchDidAuditLog(ctx, nil, repo.Repo.Did)
+
if err != nil {
+
s.logger.Error("error fetching doc", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
latest := log[len(log)-1]
+
+
op := plc.Operation{
+
Type: "plc_operation",
+
VerificationMethods: latest.Operation.VerificationMethods,
+
RotationKeys: latest.Operation.RotationKeys,
+
AlsoKnownAs: latest.Operation.AlsoKnownAs,
+
Services: latest.Operation.Services,
+
Prev: &latest.Cid,
+
}
+
if req.VerificationMethods != nil {
+
op.VerificationMethods = *req.VerificationMethods
+
}
+
if req.RotationKeys != nil {
+
op.RotationKeys = *req.RotationKeys
+
}
+
if req.AlsoKnownAs != nil {
+
op.AlsoKnownAs = *req.AlsoKnownAs
+
}
+
if req.Services != nil {
+
op.Services = *req.Services
+
}
+
+
k, err := atcrypto.ParsePrivateBytesK256(repo.SigningKey)
+
if err != nil {
+
s.logger.Error("error parsing signing key", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
if err := s.plcClient.SignOp(k, &op); err != nil {
+
s.logger.Error("error signing plc operation", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
if err := s.db.Exec("UPDATE repos SET plc_operation_code = NULL, plc_operation_code_expires_at = NULL WHERE did = ?", nil, repo.Repo.Did).Error; err != nil {
+
s.logger.Error("error updating repo", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
return e.JSON(200, ComAtprotoSignPlcOperationResponse{
+
Operation: op,
+
})
+
}
+87
server/handle_identity_submit_plc_operation.go
···
+
package server
+
+
import (
+
"context"
+
"slices"
+
"strings"
+
"time"
+
+
"github.com/bluesky-social/indigo/api/atproto"
+
"github.com/bluesky-social/indigo/atproto/atcrypto"
+
"github.com/bluesky-social/indigo/events"
+
"github.com/bluesky-social/indigo/util"
+
"github.com/haileyok/cocoon/internal/helpers"
+
"github.com/haileyok/cocoon/models"
+
"github.com/haileyok/cocoon/plc"
+
"github.com/labstack/echo/v4"
+
)
+
+
type ComAtprotoSubmitPlcOperationRequest struct {
+
Operation plc.Operation `json:"operation"`
+
}
+
+
func (s *Server) handleSubmitPlcOperation(e echo.Context) error {
+
repo := e.Get("repo").(*models.RepoActor)
+
+
var req ComAtprotoSubmitPlcOperationRequest
+
if err := e.Bind(&req); err != nil {
+
s.logger.Error("error binding", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
if err := e.Validate(req); err != nil {
+
return helpers.InputError(e, nil)
+
}
+
if !strings.HasPrefix(repo.Repo.Did, "did:plc:") {
+
return helpers.InputError(e, nil)
+
}
+
+
op := req.Operation
+
+
k, err := atcrypto.ParsePrivateBytesK256(repo.SigningKey)
+
if err != nil {
+
s.logger.Error("error parsing key", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
required, err := s.plcClient.CreateDidCredentials(k, "", repo.Actor.Handle)
+
if err != nil {
+
s.logger.Error("error crating did credentials", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
for _, expectedKey := range required.RotationKeys {
+
if !slices.Contains(op.RotationKeys, expectedKey) {
+
return helpers.InputError(e, nil)
+
}
+
}
+
if op.Services["atproto_pds"].Type != "AtprotoPersonalDataServer" {
+
return helpers.InputError(e, nil)
+
}
+
if op.Services["atproto_pds"].Endpoint != required.Services["atproto_pds"].Endpoint {
+
return helpers.InputError(e, nil)
+
}
+
if op.VerificationMethods["atproto"] != required.VerificationMethods["atproto"] {
+
return helpers.InputError(e, nil)
+
}
+
if op.AlsoKnownAs[0] != required.AlsoKnownAs[0] {
+
return helpers.InputError(e, nil)
+
}
+
+
if err := s.plcClient.SendOperation(e.Request().Context(), repo.Repo.Did, &op); err != nil {
+
return err
+
}
+
+
if err := s.passport.BustDoc(context.TODO(), repo.Repo.Did); err != nil {
+
s.logger.Warn("error busting did doc", "error", err)
+
}
+
+
s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
+
RepoIdentity: &atproto.SyncSubscribeRepos_Identity{
+
Did: repo.Repo.Did,
+
Seq: time.Now().UnixMicro(), // TODO: no
+
Time: time.Now().Format(util.ISO8601),
+
},
+
})
+
+
return nil
+
}
+9 -8
server/handle_import_repo.go
···
import (
"bytes"
-
"context"
"io"
"slices"
"strings"
···
)
func (s *Server) handleRepoImportRepo(e echo.Context) error {
+
ctx := e.Request().Context()
+
urepo := e.Get("repo").(*models.RepoActor)
b, err := io.ReadAll(e.Request().Body)
···
slices.Reverse(orderedBlocks)
-
if err := bs.PutMany(context.TODO(), orderedBlocks); err != nil {
+
if err := bs.PutMany(ctx, orderedBlocks); err != nil {
s.logger.Error("could not insert blocks", "error", err)
return helpers.ServerError(e, nil)
}
-
r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0])
+
r, err := repo.OpenRepo(ctx, bs, cs.Header.Roots[0])
if err != nil {
s.logger.Error("could not open repo", "error", err)
return helpers.ServerError(e, nil)
···
clock := syntax.NewTIDClock(0)
-
if err := r.ForEach(context.TODO(), "", func(key string, cid cid.Cid) error {
+
if err := r.ForEach(ctx, "", func(key string, cid cid.Cid) error {
pts := strings.Split(key, "/")
nsid := pts[0]
rkey := pts[1]
cidStr := cid.String()
-
b, err := bs.Get(context.TODO(), cid)
+
b, err := bs.Get(ctx, cid)
if err != nil {
s.logger.Error("record bytes don't exist in blockstore", "error", err)
return helpers.ServerError(e, nil)
···
Value: b.RawData(),
}
-
if err := tx.Create(rec).Error; err != nil {
+
if err := tx.Save(rec).Error; err != nil {
return err
}
···
tx.Commit()
-
root, rev, err := r.Commit(context.TODO(), urepo.SignFor)
+
root, rev, err := r.Commit(ctx, urepo.SignFor)
if err != nil {
s.logger.Error("error committing", "error", err)
return helpers.ServerError(e, nil)
}
-
if err := s.UpdateRepo(context.TODO(), urepo.Repo.Did, root, rev); err != nil {
+
if err := s.UpdateRepo(ctx, urepo.Repo.Did, root, rev); err != nil {
s.logger.Error("error updating repo after commit", "error", err)
return helpers.ServerError(e, nil)
}
+15 -2
server/handle_proxy.go
···
encheader := strings.TrimRight(base64.RawURLEncoding.EncodeToString(hj), "=")
+
// When proxying app.bsky.feed.getFeed the token is actually issued for the
+
// underlying feed generator and the app view passes it on. This allows the
+
// getFeed implementation to pass in the desired lxm and aud for the token
+
// and then just delegate to the general proxying logic
+
lxm, proxyTokenLxmExists := e.Get("proxyTokenLxm").(string)
+
if !proxyTokenLxmExists || lxm == "" {
+
lxm = pts[2]
+
}
+
aud, proxyTokenAudExists := e.Get("proxyTokenAud").(string)
+
if !proxyTokenAudExists || aud == "" {
+
aud = svcDid
+
}
+
payload := map[string]any{
"iss": repo.Repo.Did,
-
"aud": svcDid,
-
"lxm": pts[2],
+
"aud": aud,
+
"lxm": lxm,
"jti": uuid.NewString(),
"exp": time.Now().Add(1 * time.Minute).UTC().Unix(),
}
+35
server/handle_proxy_get_feed.go
···
+
package server
+
+
import (
+
"github.com/Azure/go-autorest/autorest/to"
+
"github.com/bluesky-social/indigo/api/atproto"
+
"github.com/bluesky-social/indigo/api/bsky"
+
"github.com/bluesky-social/indigo/atproto/syntax"
+
"github.com/bluesky-social/indigo/xrpc"
+
"github.com/haileyok/cocoon/internal/helpers"
+
"github.com/labstack/echo/v4"
+
)
+
+
func (s *Server) handleProxyBskyFeedGetFeed(e echo.Context) error {
+
feedUri, err := syntax.ParseATURI(e.QueryParam("feed"))
+
if err != nil {
+
return helpers.InputError(e, to.StringPtr("invalid feed uri"))
+
}
+
+
appViewEndpoint, _, err := s.getAtprotoProxyEndpointFromRequest(e)
+
if err != nil {
+
e.Logger().Error("could not get atproto proxy", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
appViewClient := xrpc.Client{
+
Host: appViewEndpoint,
+
}
+
feedRecord, err := atproto.RepoGetRecord(e.Request().Context(), &appViewClient, "", feedUri.Collection().String(), feedUri.Authority().String(), feedUri.RecordKey().String())
+
feedGeneratorDid := feedRecord.Value.Val.(*bsky.FeedGenerator).Did
+
+
e.Set("proxyTokenLxm", "app.bsky.feed.getFeedSkeleton")
+
e.Set("proxyTokenAud", feedGeneratorDid)
+
+
return s.handleProxy(e)
+
}
+3 -1
server/handle_repo_apply_writes.go
···
}
func (s *Server) handleApplyWrites(e echo.Context) error {
+
ctx := e.Request().Context()
+
repo := e.Get("repo").(*models.RepoActor)
var req ComAtprotoRepoApplyWritesRequest
···
})
}
-
results, err := s.repoman.applyWrites(repo.Repo, ops, req.SwapCommit)
+
results, err := s.repoman.applyWrites(ctx, repo.Repo, ops, req.SwapCommit)
if err != nil {
s.logger.Error("error applying writes", "error", err)
return helpers.ServerError(e, nil)
+3 -1
server/handle_repo_create_record.go
···
}
func (s *Server) handleCreateRecord(e echo.Context) error {
+
ctx := e.Request().Context()
+
repo := e.Get("repo").(*models.RepoActor)
var req ComAtprotoRepoCreateRecordRequest
···
optype = OpTypeUpdate
}
-
results, err := s.repoman.applyWrites(repo.Repo, []Op{
+
results, err := s.repoman.applyWrites(ctx, repo.Repo, []Op{
{
Type: optype,
Collection: req.Collection,
+3 -1
server/handle_repo_delete_record.go
···
}
func (s *Server) handleDeleteRecord(e echo.Context) error {
+
ctx := e.Request().Context()
+
repo := e.Get("repo").(*models.RepoActor)
var req ComAtprotoRepoDeleteRecordRequest
···
return helpers.InputError(e, nil)
}
-
results, err := s.repoman.applyWrites(repo.Repo, []Op{
+
results, err := s.repoman.applyWrites(ctx, repo.Repo, []Op{
{
Type: OpTypeDelete,
Collection: req.Collection,
+3 -1
server/handle_repo_put_record.go
···
}
func (s *Server) handlePutRecord(e echo.Context) error {
+
ctx := e.Request().Context()
+
repo := e.Get("repo").(*models.RepoActor)
var req ComAtprotoRepoPutRecordRequest
···
optype = OpTypeUpdate
}
-
results, err := s.repoman.applyWrites(repo.Repo, []Op{
+
results, err := s.repoman.applyWrites(ctx, repo.Repo, []Op{
{
Type: optype,
Collection: req.Collection,
+46 -15
server/handle_server_check_account_status.go
···
package server
import (
+
"errors"
+
"sync"
+
+
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/haileyok/cocoon/internal/helpers"
"github.com/haileyok/cocoon/models"
"github.com/ipfs/go-cid"
···
func (s *Server) handleServerCheckAccountStatus(e echo.Context) error {
urepo := e.Get("repo").(*models.RepoActor)
+
_, didErr := syntax.ParseDID(urepo.Repo.Did)
+
if didErr != nil {
+
s.logger.Error("error validating did", "err", didErr)
+
}
+
resp := ComAtprotoServerCheckAccountStatusResponse{
Activated: true, // TODO: should allow for deactivation etc.
-
ValidDid: true, // TODO: should probably verify?
+
ValidDid: didErr == nil,
RepoRev: urepo.Rev,
ImportedBlobs: 0, // TODO: ???
}
···
s.logger.Error("error casting cid", "error", err)
return helpers.ServerError(e, nil)
}
+
resp.RepoCommit = rootcid.String()
type CountResp struct {
···
}
var blockCtResp CountResp
-
if err := s.db.Raw("SELECT COUNT(*) AS ct FROM blocks WHERE did = ?", nil, urepo.Repo.Did).Scan(&blockCtResp).Error; err != nil {
-
s.logger.Error("error getting block count", "error", err)
-
return helpers.ServerError(e, nil)
-
}
-
resp.RepoBlocks = blockCtResp.Ct
+
var recCtResp CountResp
+
var blobCtResp CountResp
+
+
var wg sync.WaitGroup
+
var procErr error
+
+
wg.Add(1)
+
go func() {
+
defer wg.Done()
+
if err := s.db.Raw("SELECT COUNT(*) AS ct FROM blocks WHERE did = ?", nil, urepo.Repo.Did).Scan(&blockCtResp).Error; err != nil {
+
s.logger.Error("error getting block count", "error", err)
+
procErr = errors.Join(procErr, err)
+
}
+
}()
+
+
wg.Add(1)
+
go func() {
+
defer wg.Done()
+
if err := s.db.Raw("SELECT COUNT(*) AS ct FROM records WHERE did = ?", nil, urepo.Repo.Did).Scan(&recCtResp).Error; err != nil {
+
s.logger.Error("error getting record count", "error", err)
+
procErr = errors.Join(procErr, err)
+
}
+
}()
-
var recCtResp CountResp
-
if err := s.db.Raw("SELECT COUNT(*) AS ct FROM records WHERE did = ?", nil, urepo.Repo.Did).Scan(&recCtResp).Error; err != nil {
-
s.logger.Error("error getting record count", "error", err)
-
return helpers.ServerError(e, nil)
-
}
-
resp.IndexedRecords = recCtResp.Ct
+
wg.Add(1)
+
go func() {
+
if err := s.db.Raw("SELECT COUNT(*) AS ct FROM blobs WHERE did = ?", nil, urepo.Repo.Did).Scan(&blobCtResp).Error; err != nil {
+
s.logger.Error("error getting expected blobs count", "error", err)
+
procErr = errors.Join(procErr, err)
+
}
+
}()
-
var blobCtResp CountResp
-
if err := s.db.Raw("SELECT COUNT(*) AS ct FROM blobs WHERE did = ?", nil, urepo.Repo.Did).Scan(&blobCtResp).Error; err != nil {
-
s.logger.Error("error getting record count", "error", err)
+
wg.Wait()
+
if procErr != nil {
return helpers.ServerError(e, nil)
}
+
+
resp.RepoBlocks = blockCtResp.Ct
+
resp.IndexedRecords = recCtResp.Ct
resp.ExpectedBlobs = blobCtResp.Ct
return e.JSON(200, resp)
+3 -1
server/handle_sync_get_record.go
···
)
func (s *Server) handleSyncGetRecord(e echo.Context) error {
+
ctx := e.Request().Context()
+
did := e.QueryParam("did")
collection := e.QueryParam("collection")
rkey := e.QueryParam("rkey")
···
return helpers.ServerError(e, nil)
}
-
root, blocks, err := s.repoman.getRecordProof(urepo, collection, rkey)
+
root, blocks, err := s.repoman.getRecordProof(ctx, urepo, collection, rkey)
if err != nil {
return err
}
+19
server/mail.go
···
return nil
}
+
func (s *Server) sendPlcTokenReset(email, handle, code string) error {
+
if s.mail == nil {
+
return nil
+
}
+
+
s.mailLk.Lock()
+
defer s.mailLk.Unlock()
+
+
s.mail.To(email)
+
s.mail.Subject("PLC token for " + s.config.Hostname)
+
s.mail.Plain().Set(fmt.Sprintf("Hello %s. Your PLC operation code is %s. This code will expire in ten minutes.", handle, code))
+
+
if err := s.mail.Send(); err != nil {
+
return err
+
}
+
+
return nil
+
}
+
func (s *Server) sendEmailUpdate(email, handle, code string) error {
if s.mail == nil {
return nil
+19 -16
server/repo.go
···
}
// TODO make use of swap commit
-
func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) {
+
func (rm *RepoMan) applyWrites(ctx context.Context, urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) {
rootcid, err := cid.Cast(urepo.Root)
if err != nil {
return nil, err
···
dbs := rm.s.getBlockstore(urepo.Did)
bs := recording_blockstore.New(dbs)
-
r, err := repo.OpenRepo(context.TODO(), bs, rootcid)
+
r, err := repo.OpenRepo(ctx, bs, rootcid)
-
entries := []models.Record{}
-
var results []ApplyWriteResult
+
entries := make([]models.Record, 0, len(writes))
+
results := make([]ApplyWriteResult, 0, len(writes))
for i, op := range writes {
if op.Type != OpTypeCreate && op.Rkey == nil {
return nil, fmt.Errorf("invalid rkey")
} else if op.Type == OpTypeCreate && op.Rkey != nil {
-
_, _, err := r.GetRecord(context.TODO(), op.Collection+"/"+*op.Rkey)
+
_, _, err := r.GetRecord(ctx, op.Collection+"/"+*op.Rkey)
if err == nil {
op.Type = OpTypeUpdate
}
···
mm["$type"] = op.Collection
}
-
nc, err := r.PutRecord(context.TODO(), op.Collection+"/"+*op.Rkey, &mm)
+
nc, err := r.PutRecord(ctx, op.Collection+"/"+*op.Rkey, &mm)
if err != nil {
return nil, err
}
···
Rkey: *op.Rkey,
Value: old.Value,
})
-
err := r.DeleteRecord(context.TODO(), op.Collection+"/"+*op.Rkey)
+
err := r.DeleteRecord(ctx, op.Collection+"/"+*op.Rkey)
if err != nil {
return nil, err
}
···
return nil, err
}
mm := MarshalableMap(out)
-
nc, err := r.UpdateRecord(context.TODO(), op.Collection+"/"+*op.Rkey, &mm)
+
nc, err := r.UpdateRecord(ctx, op.Collection+"/"+*op.Rkey, &mm)
if err != nil {
return nil, err
}
···
}
}
-
newroot, rev, err := r.Commit(context.TODO(), urepo.SignFor)
+
newroot, rev, err := r.Commit(ctx, urepo.SignFor)
if err != nil {
return nil, err
}
···
Roots: []cid.Cid{newroot},
Version: 1,
})
+
if err != nil {
+
return nil, err
+
}
if _, err := carstore.LdWrite(buf, hb); err != nil {
return nil, err
}
-
diffops, err := r.DiffSince(context.TODO(), rootcid)
+
diffops, err := r.DiffSince(ctx, rootcid)
if err != nil {
return nil, err
}
···
})
}
-
blk, err := dbs.Get(context.TODO(), c)
+
blk, err := dbs.Get(ctx, c)
if err != nil {
return nil, err
}
···
}
}
-
rm.s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
+
rm.s.evtman.AddEvent(ctx, &events.XRPCStreamEvent{
RepoCommit: &atproto.SyncSubscribeRepos_Commit{
Repo: urepo.Did,
Blocks: buf.Bytes(),
···
},
})
-
if err := rm.s.UpdateRepo(context.TODO(), urepo.Did, newroot, rev); err != nil {
+
if err := rm.s.UpdateRepo(ctx, urepo.Did, newroot, rev); err != nil {
return nil, err
}
···
return results, nil
}
-
func (rm *RepoMan) getRecordProof(urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) {
+
func (rm *RepoMan) getRecordProof(ctx context.Context, urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) {
c, err := cid.Cast(urepo.Root)
if err != nil {
return cid.Undef, nil, err
···
dbs := rm.s.getBlockstore(urepo.Did)
bs := recording_blockstore.New(dbs)
-
r, err := repo.OpenRepo(context.TODO(), bs, c)
+
r, err := repo.OpenRepo(ctx, bs, c)
if err != nil {
return cid.Undef, nil, err
}
-
_, _, err = r.GetRecordBytes(context.TODO(), collection+"/"+rkey)
+
_, _, err = r.GetRecordBytes(ctx, collection+"/"+rkey)
if err != nil {
return cid.Undef, nil, err
}
+5
server/server.go
···
s.echo.GET("/xrpc/com.atproto.server.getSession", s.handleGetSession, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
s.echo.POST("/xrpc/com.atproto.server.refreshSession", s.handleRefreshSession, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
s.echo.POST("/xrpc/com.atproto.server.deleteSession", s.handleDeleteSession, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
+
s.echo.GET("/xrpc/com.atproto.identity.getRecommendedDidCredentials", s.handleGetRecommendedDidCredentials, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
s.echo.POST("/xrpc/com.atproto.identity.updateHandle", s.handleIdentityUpdateHandle, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
+
s.echo.POST("/xrpc/com.atproto.identity.requestPlcOperationSignature", s.handleIdentityRequestPlcOperationSignature, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
+
s.echo.POST("/xrpc/com.atproto.identity.signPlcOperation", s.handleSignPlcOperation, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
+
s.echo.POST("/xrpc/com.atproto.identity.submitPlcOperation", s.handleSubmitPlcOperation, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
s.echo.POST("/xrpc/com.atproto.server.confirmEmail", s.handleServerConfirmEmail, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
s.echo.POST("/xrpc/com.atproto.server.requestEmailConfirmation", s.handleServerRequestEmailConfirmation, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
s.echo.POST("/xrpc/com.atproto.server.requestPasswordReset", s.handleServerRequestPasswordReset) // AUTH NOT REQUIRED FOR THIS ONE
···
// stupid silly endpoints
s.echo.GET("/xrpc/app.bsky.actor.getPreferences", s.handleActorGetPreferences, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
s.echo.POST("/xrpc/app.bsky.actor.putPreferences", s.handleActorPutPreferences, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
+
s.echo.GET("/xrpc/app.bsky.feed.getFeed", s.handleProxyBskyFeedGetFeed, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
// admin routes
s.echo.POST("/xrpc/com.atproto.server.createInviteCode", s.handleCreateInviteCode, s.handleAdminMiddleware)