An atproto PDS written in Go

Compare changes

Choose any two refs to compare.

+58
.github/workflows/docker-image.yml
···
+
name: Docker image
+
+
on:
+
workflow_dispatch:
+
push:
+
+
env:
+
REGISTRY: ghcr.io
+
IMAGE_NAME: ${{ github.repository }}
+
+
jobs:
+
build-and-push-image:
+
runs-on: ubuntu-latest
+
# Sets the permissions granted to the `GITHUB_TOKEN` for the actions in this job.
+
permissions:
+
contents: read
+
packages: write
+
attestations: write
+
id-token: write
+
#
+
steps:
+
- name: Checkout repository
+
uses: actions/checkout@v4
+
# Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here.
+
- name: Log in to the Container registry
+
uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1
+
with:
+
registry: ${{ env.REGISTRY }}
+
username: ${{ github.actor }}
+
password: ${{ secrets.GITHUB_TOKEN }}
+
# This step uses [docker/metadata-action](https://github.com/docker/metadata-action#about) to extract tags and labels that will be applied to the specified image. The `id` "meta" allows the output of this step to be referenced in a subsequent step. The `images` value provides the base name for the tags and labels.
+
- name: Extract metadata (tags, labels) for Docker
+
id: meta
+
uses: docker/metadata-action@v5
+
with:
+
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
+
tags: |
+
type=sha
+
type=sha,format=long
+
# This step uses the `docker/build-push-action` action to build the image, based on your repository's `Dockerfile`. If the build succeeds, it pushes the image to GitHub Packages.
+
# It uses the `context` parameter to define the build's context as the set of files located in the specified path. For more information, see "[Usage](https://github.com/docker/build-push-action#usage)" in the README of the `docker/build-push-action` repository.
+
# It uses the `tags` and `labels` parameters to tag and label the image with the output from the "meta" step.
+
- name: Build and push Docker image
+
id: push
+
uses: docker/build-push-action@v5
+
with:
+
context: .
+
push: true
+
tags: ${{ steps.meta.outputs.tags }}
+
labels: ${{ steps.meta.outputs.labels }}
+
+
# This step generates an artifact attestation for the image, which is an unforgeable statement about where and how it was built. It increases supply chain security for people who consume the image. For more information, see "[AUTOTITLE](/actions/security-guides/using-artifact-attestations-to-establish-provenance-for-builds)."
+
- name: Generate artifact attestation
+
uses: actions/attest-build-provenance@v1
+
with:
+
subject-name: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME}}
+
subject-digest: ${{ steps.push.outputs.digest }}
+
push-to-registry: true
+25
Dockerfile
···
+
### Compile stage
+
FROM golang:1.25.1-bookworm AS build-env
+
+
ADD . /dockerbuild
+
WORKDIR /dockerbuild
+
+
RUN GIT_VERSION=$(git describe --tags --long --always || echo "dev-local") && \
+
go mod tidy && \
+
go build -ldflags "-X main.Version=$GIT_VERSION" -o cocoon ./cmd/cocoon
+
+
### Run stage
+
FROM debian:bookworm-slim AS run
+
+
RUN apt-get update && apt-get install -y dumb-init runit ca-certificates && rm -rf /var/lib/apt/lists/*
+
ENTRYPOINT ["dumb-init", "--"]
+
+
WORKDIR /
+
RUN mkdir -p data/cocoon
+
COPY --from=build-env /dockerbuild/cocoon /
+
+
CMD ["/cocoon", "run"]
+
+
LABEL org.opencontainers.image.source=https://github.com/haileyok/cocoon
+
LABEL org.opencontainers.image.description="Cocoon ATProto PDS"
+
LABEL org.opencontainers.image.licenses=MIT
+4
Makefile
···
.env:
if [ ! -f ".env" ]; then cp example.dev.env .env; fi
+
+
.PHONY: docker-build
+
docker-build:
+
docker build -t cocoon .
+2 -2
README.md
···
### Server
-
- [ ] `com.atproto.server.activateAccount`
+
- [x] `com.atproto.server.activateAccount`
- [x] `com.atproto.server.checkAccountStatus`
- [x] `com.atproto.server.confirmEmail`
- [x] `com.atproto.server.createAccount`
- [x] `com.atproto.server.createInviteCode`
- [x] `com.atproto.server.createInviteCodes`
-
- [ ] `com.atproto.server.deactivateAccount`
+
- [x] `com.atproto.server.deactivateAccount`
- [ ] `com.atproto.server.deleteAccount`
- [x] `com.atproto.server.deleteSession`
- [x] `com.atproto.server.describeServer`
+20 -16
cmd/cocoon/main.go
···
"os"
"time"
-
"github.com/bluesky-social/indigo/atproto/crypto"
+
"github.com/bluesky-social/indigo/atproto/atcrypto"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/haileyok/cocoon/internal/helpers"
"github.com/haileyok/cocoon/server"
···
Name: "s3-backups-enabled",
EnvVars: []string{"COCOON_S3_BACKUPS_ENABLED"},
},
+
&cli.BoolFlag{
+
Name: "s3-blobstore-enabled",
+
EnvVars: []string{"COCOON_S3_BLOBSTORE_ENABLED"},
+
},
&cli.StringFlag{
Name: "s3-region",
EnvVars: []string{"COCOON_S3_REGION"},
···
EnvVars: []string{"COCOON_SESSION_SECRET"},
},
&cli.StringFlag{
-
Name: "default-atproto-proxy",
-
EnvVars: []string{"COCOON_DEFAULT_ATPROTO_PROXY"},
-
Value: "did:web:api.bsky.app#bsky_appview",
-
},
-
&cli.StringFlag{
Name: "blockstore-variant",
EnvVars: []string{"COCOON_BLOCKSTORE_VARIANT"},
Value: "sqlite",
+
},
+
&cli.StringFlag{
+
Name: "fallback-proxy",
+
EnvVars: []string{"COCOON_FALLBACK_PROXY"},
},
},
Commands: []*cli.Command{
···
SmtpEmail: cmd.String("smtp-email"),
SmtpName: cmd.String("smtp-name"),
S3Config: &server.S3Config{
-
BackupsEnabled: cmd.Bool("s3-backups-enabled"),
-
Region: cmd.String("s3-region"),
-
Bucket: cmd.String("s3-bucket"),
-
Endpoint: cmd.String("s3-endpoint"),
-
AccessKey: cmd.String("s3-access-key"),
-
SecretKey: cmd.String("s3-secret-key"),
+
BackupsEnabled: cmd.Bool("s3-backups-enabled"),
+
BlobstoreEnabled: cmd.Bool("s3-blobstore-enabled"),
+
Region: cmd.String("s3-region"),
+
Bucket: cmd.String("s3-bucket"),
+
Endpoint: cmd.String("s3-endpoint"),
+
AccessKey: cmd.String("s3-access-key"),
+
SecretKey: cmd.String("s3-secret-key"),
},
-
SessionSecret: cmd.String("session-secret"),
-
DefaultAtprotoProxy: cmd.String("default-atproto-proxy"),
-
BlockstoreVariant: server.MustReturnBlockstoreVariant(cmd.String("blockstore-variant")),
+
SessionSecret: cmd.String("session-secret"),
+
BlockstoreVariant: server.MustReturnBlockstoreVariant(cmd.String("blockstore-variant")),
+
FallbackProxy: cmd.String("fallback-proxy"),
})
if err != nil {
fmt.Printf("error creating cocoon: %v", err)
···
},
},
Action: func(cmd *cli.Context) error {
-
key, err := crypto.GeneratePrivateKeyK256()
+
key, err := atcrypto.GeneratePrivateKeyK256()
if err != nil {
return err
}
+2 -2
go.mod
···
require (
github.com/Azure/go-autorest/autorest/to v0.4.1
github.com/aws/aws-sdk-go v1.55.7
-
github.com/bluesky-social/indigo v0.0.0-20250414202759-826fcdeaa36b
+
github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792
github.com/domodwyer/mailyak/v3 v3.6.2
github.com/go-pkgz/expirable-cache/v3 v3.0.0
···
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/ipfs/go-block-format v0.2.0
github.com/ipfs/go-cid v0.4.1
+
github.com/ipfs/go-ipfs-blockstore v1.3.1
github.com/ipfs/go-ipld-cbor v0.1.0
github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4
github.com/joho/godotenv v1.5.1
···
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-blockservice v0.5.2 // indirect
github.com/ipfs/go-datastore v0.6.0 // indirect
-
github.com/ipfs/go-ipfs-blockstore v1.3.1 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.1 // indirect
github.com/ipfs/go-ipfs-exchange-interface v0.2.1 // indirect
github.com/ipfs/go-ipfs-util v0.0.3 // indirect
+2 -4
go.sum
···
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
-
github.com/bluesky-social/indigo v0.0.0-20250414202759-826fcdeaa36b h1:elwfbe+W7GkUmPKFX1h7HaeHvC/kC0XJWfiEHC62xPg=
-
github.com/bluesky-social/indigo v0.0.0-20250414202759-826fcdeaa36b/go.mod h1:yjdhLA1LkK8VDS/WPUoYPo25/Hq/8rX38Ftr67EsqKY=
+
github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe h1:VBhaqE5ewQgXbY5SfSWFZC/AwHFo7cHxZKFYi2ce9Yo=
+
github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe/go.mod h1:RuQVrCGm42QNsgumKaR6se+XkFKfCPNwdCiTvqKRUck=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 h1:R8vQdOQdZ9Y3SkEwmHoWBmX1DNXhXZqlTpq6s4tyJGc=
···
github.com/ipfs/go-block-format v0.2.0/go.mod h1:+jpL11nFx5A/SPpsoBn6Bzkra/zaArfSmsknbPMYgzM=
github.com/ipfs/go-blockservice v0.5.2 h1:in9Bc+QcXwd1apOVM7Un9t8tixPKdaHQFdLSUM1Xgk8=
github.com/ipfs/go-blockservice v0.5.2/go.mod h1:VpMblFEqG67A/H2sHKAemeH9vlURVavlysbdUI632yk=
-
github.com/ipfs/go-bs-sqlite3 v0.0.0-20221122195556-bfcee1be620d h1:9V+GGXCuOfDiFpdAHz58q9mKLg447xp0cQKvqQrAwYE=
-
github.com/ipfs/go-bs-sqlite3 v0.0.0-20221122195556-bfcee1be620d/go.mod h1:pMbnFyNAGjryYCLCe59YDLRv/ujdN+zGJBT1umlvYRM=
github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s=
github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk=
github.com/ipfs/go-datastore v0.6.0 h1:JKyz+Gvz1QEZw0LsX1IBn+JFCJQH4SJVFtM4uWU0Myk=
+17 -2
models/models.go
···
"context"
"time"
-
"github.com/bluesky-social/indigo/atproto/crypto"
+
"github.com/Azure/go-autorest/autorest/to"
+
"github.com/bluesky-social/indigo/atproto/atcrypto"
)
type Repo struct {
···
Rev string
Root []byte
Preferences []byte
+
Deactivated bool
}
func (r *Repo) SignFor(ctx context.Context, did string, msg []byte) ([]byte, error) {
-
k, err := crypto.ParsePrivateBytesK256(r.SigningKey)
+
k, err := atcrypto.ParsePrivateBytesK256(r.SigningKey)
if err != nil {
return nil, err
}
···
}
return sig, nil
+
}
+
+
func (r *Repo) Status() *string {
+
var status *string
+
if r.Deactivated {
+
status = to.StringPtr("deactivated")
+
}
+
return status
+
}
+
+
func (r *Repo) Active() bool {
+
return r.Status() == nil
}
type Actor struct {
···
Did string `gorm:"index;index:idx_blob_did_cid"`
Cid []byte `gorm:"index;index:idx_blob_did_cid"`
RefCount int
+
Storage string `gorm:"default:sqlite;check:storage in ('sqlite', 's3')"`
}
type BlobPart struct {
+11 -3
oauth/client/manager.go
···
}
var jwks jwk.Key
-
if metadata.JWKS != nil {
+
if metadata.JWKS != nil && len(metadata.JWKS.Keys) > 0 {
// TODO: this is kinda bad but whatever for now. there could obviously be more than one jwk, and we need to
// make sure we use the right one
-
k, err := helpers.ParseJWKFromBytes((*metadata.JWKS)[0])
+
b, err := json.Marshal(metadata.JWKS.Keys[0])
if err != nil {
return nil, err
}
+
+
k, err := helpers.ParseJWKFromBytes(b)
+
if err != nil {
+
return nil, err
+
}
+
jwks = k
} else if metadata.JWKSURI != nil {
maybeJwks, err := cm.getClientJwks(ctx, clientId, *metadata.JWKSURI)
···
}
jwks = maybeJwks
+
} else {
+
return nil, fmt.Errorf("no valid jwks found in oauth client metadata")
}
return &Client{
···
return nil, errors.New("private_key_jwt auth method requires jwks or jwks_uri")
}
-
if metadata.JWKS != nil && len(*metadata.JWKS) == 0 {
+
if metadata.JWKS != nil && len(metadata.JWKS.Keys) == 0 {
return nil, errors.New("private_key_jwt auth method requires atleast one key in jwks")
}
+20 -16
oauth/client/metadata.go
···
package client
type Metadata struct {
-
ClientID string `json:"client_id"`
-
ClientName string `json:"client_name"`
-
ClientURI string `json:"client_uri"`
-
LogoURI string `json:"logo_uri"`
-
TOSURI string `json:"tos_uri"`
-
PolicyURI string `json:"policy_uri"`
-
RedirectURIs []string `json:"redirect_uris"`
-
GrantTypes []string `json:"grant_types"`
-
ResponseTypes []string `json:"response_types"`
-
ApplicationType string `json:"application_type"`
-
DpopBoundAccessTokens bool `json:"dpop_bound_access_tokens"`
-
JWKSURI *string `json:"jwks_uri,omitempty"`
-
JWKS *[][]byte `json:"jwks,omitempty"`
-
Scope string `json:"scope"`
-
TokenEndpointAuthMethod string `json:"token_endpoint_auth_method"`
-
TokenEndpointAuthSigningAlg string `json:"token_endpoint_auth_signing_alg"`
+
ClientID string `json:"client_id"`
+
ClientName string `json:"client_name"`
+
ClientURI string `json:"client_uri"`
+
LogoURI string `json:"logo_uri"`
+
TOSURI string `json:"tos_uri"`
+
PolicyURI string `json:"policy_uri"`
+
RedirectURIs []string `json:"redirect_uris"`
+
GrantTypes []string `json:"grant_types"`
+
ResponseTypes []string `json:"response_types"`
+
ApplicationType string `json:"application_type"`
+
DpopBoundAccessTokens bool `json:"dpop_bound_access_tokens"`
+
JWKSURI *string `json:"jwks_uri,omitempty"`
+
JWKS *MetadataJwks `json:"jwks,omitempty"`
+
Scope string `json:"scope"`
+
TokenEndpointAuthMethod string `json:"token_endpoint_auth_method"`
+
TokenEndpointAuthSigningAlg string `json:"token_endpoint_auth_signing_alg"`
+
}
+
+
type MetadataJwks struct {
+
Keys []any `json:"keys"`
}
+6 -2
oauth/dpop/manager.go
···
Hostname string
}
+
var (
+
ErrUseDpopNonce = errors.New("use_dpop_nonce")
+
)
+
func NewManager(args ManagerArgs) *Manager {
if args.Logger == nil {
args.Logger = slog.Default()
···
nonce, _ := claims["nonce"].(string)
if nonce == "" {
// WARN: this _must_ be `use_dpop_nonce` for clients know they should make another request
-
return nil, errors.New("use_dpop_nonce")
+
return nil, ErrUseDpopNonce
}
if nonce != "" && !dm.nonce.Check(nonce) {
// WARN: this _must_ be `use_dpop_nonce` so that clients will fetch a new nonce
-
return nil, errors.New("use_dpop_nonce")
+
return nil, ErrUseDpopNonce
}
ath, _ := claims["ath"].(string)
+5 -5
plc/client.go
···
"net/url"
"strings"
-
"github.com/bluesky-social/indigo/atproto/crypto"
+
"github.com/bluesky-social/indigo/atproto/atcrypto"
"github.com/bluesky-social/indigo/util"
"github.com/haileyok/cocoon/identity"
)
···
h *http.Client
service string
pdsHostname string
-
rotationKey *crypto.PrivateKeyK256
+
rotationKey *atcrypto.PrivateKeyK256
}
type ClientArgs struct {
···
args.H = util.RobustHTTPClient()
}
-
rk, err := crypto.ParsePrivateBytesK256([]byte(args.RotationKey))
+
rk, err := atcrypto.ParsePrivateBytesK256([]byte(args.RotationKey))
if err != nil {
return nil, err
}
···
}, nil
}
-
func (c *Client) CreateDID(sigkey *crypto.PrivateKeyK256, recovery string, handle string) (string, *Operation, error) {
+
func (c *Client) CreateDID(sigkey *atcrypto.PrivateKeyK256, recovery string, handle string) (string, *Operation, error) {
pubsigkey, err := sigkey.PublicKey()
if err != nil {
return "", nil, err
···
return did, &op, nil
}
-
func (c *Client) SignOp(sigkey *crypto.PrivateKeyK256, op *Operation) error {
+
func (c *Client) SignOp(sigkey *atcrypto.PrivateKeyK256, op *Operation) error {
b, err := op.MarshalCBOR()
if err != nil {
return err
+2 -2
plc/types.go
···
import (
"encoding/json"
-
"github.com/bluesky-social/indigo/atproto/data"
+
"github.com/bluesky-social/indigo/atproto/atdata"
"github.com/haileyok/cocoon/identity"
cbg "github.com/whyrusleeping/cbor-gen"
)
···
return nil, err
}
-
b, err = data.MarshalCBOR(m)
+
b, err = atdata.MarshalCBOR(m)
if err != nil {
return nil, err
}
+1 -1
server/handle_actor_get_preferences.go
···
err := json.Unmarshal(repo.Preferences, &prefs)
if err != nil || prefs["preferences"] == nil {
prefs = map[string]any{
-
"preferences": map[string]any{},
+
"preferences": []any{},
}
}
+2 -11
server/handle_identity_update_handle.go
···
"github.com/Azure/go-autorest/autorest/to"
"github.com/bluesky-social/indigo/api/atproto"
-
"github.com/bluesky-social/indigo/atproto/crypto"
+
"github.com/bluesky-social/indigo/atproto/atcrypto"
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/util"
"github.com/haileyok/cocoon/identity"
···
Prev: &latest.Cid,
}
-
k, err := crypto.ParsePrivateBytesK256(repo.SigningKey)
+
k, err := atcrypto.ParsePrivateBytesK256(repo.SigningKey)
if err != nil {
s.logger.Error("error parsing signing key", "error", err)
return helpers.ServerError(e, nil)
···
if err := s.passport.BustDoc(context.TODO(), repo.Repo.Did); err != nil {
s.logger.Warn("error busting did doc", "error", err)
}
-
-
s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
-
RepoHandle: &atproto.SyncSubscribeRepos_Handle{
-
Did: repo.Repo.Did,
-
Handle: req.Handle,
-
Seq: time.Now().UnixMicro(), // TODO: no
-
Time: time.Now().Format(util.ISO8601),
-
},
-
})
s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
RepoIdentity: &atproto.SyncSubscribeRepos_Identity{
+8 -1
server/handle_oauth_par.go
···
package server
import (
+
"errors"
"time"
"github.com/Azure/go-autorest/autorest/to"
"github.com/haileyok/cocoon/internal/helpers"
"github.com/haileyok/cocoon/oauth"
"github.com/haileyok/cocoon/oauth/constants"
+
"github.com/haileyok/cocoon/oauth/dpop"
"github.com/haileyok/cocoon/oauth/provider"
"github.com/labstack/echo/v4"
)
···
// TODO: this seems wrong. should be a way to get the entire request url i believe, but this will work for now
dpopProof, err := s.oauthProvider.DpopManager.CheckProof(e.Request().Method, "https://"+s.config.Hostname+e.Request().URL.String(), e.Request().Header, nil)
if err != nil {
+
if errors.Is(err, dpop.ErrUseDpopNonce) {
+
return e.JSON(400, map[string]string{
+
"error": "use_dpop_nonce",
+
})
+
}
s.logger.Error("error getting dpop proof", "error", err)
-
return helpers.InputError(e, to.StringPtr(err.Error()))
+
return helpers.InputError(e, nil)
}
client, clientAuth, err := s.oauthProvider.AuthenticateClient(e.Request().Context(), parRequest.AuthenticateClientRequestBase, dpopProof, &provider.AuthenticateClientOptions{
+8 -1
server/handle_oauth_token.go
···
"bytes"
"crypto/sha256"
"encoding/base64"
+
"errors"
"fmt"
"slices"
"time"
···
"github.com/haileyok/cocoon/internal/helpers"
"github.com/haileyok/cocoon/oauth"
"github.com/haileyok/cocoon/oauth/constants"
+
"github.com/haileyok/cocoon/oauth/dpop"
"github.com/haileyok/cocoon/oauth/provider"
"github.com/labstack/echo/v4"
)
···
proof, err := s.oauthProvider.DpopManager.CheckProof(e.Request().Method, e.Request().URL.String(), e.Request().Header, nil)
if err != nil {
+
if errors.Is(err, dpop.ErrUseDpopNonce) {
+
return e.JSON(400, map[string]string{
+
"error": "use_dpop_nonce",
+
})
+
}
s.logger.Error("error getting dpop proof", "error", err)
-
return helpers.InputError(e, to.StringPtr(err.Error()))
+
return helpers.InputError(e, nil)
}
client, clientAuth, err := s.oauthProvider.AuthenticateClient(e.Request().Context(), req.AuthenticateClientRequestBase, proof, &provider.AuthenticateClientOptions{
+2 -2
server/handle_proxy.go
···
func (s *Server) getAtprotoProxyEndpointFromRequest(e echo.Context) (string, string, error) {
svc := e.Request().Header.Get("atproto-proxy")
-
if svc == "" {
-
svc = s.config.DefaultAtprotoProxy
+
if svc == "" && s.config.FallbackProxy != "" {
+
svc = s.config.FallbackProxy
}
svcPts := strings.Split(svc, "#")
+2 -2
server/handle_repo_get_record.go
···
package server
import (
-
"github.com/bluesky-social/indigo/atproto/data"
+
"github.com/bluesky-social/indigo/atproto/atdata"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/haileyok/cocoon/models"
"github.com/labstack/echo/v4"
···
return err
}
-
val, err := data.UnmarshalCBOR(record.Value)
+
val, err := atdata.UnmarshalCBOR(record.Value)
if err != nil {
return s.handleProxy(e) // TODO: this should be getting handled like...if we don't find it in the db. why doesn't it throw error up there?
}
+2 -2
server/handle_repo_list_records.go
···
"strconv"
"github.com/Azure/go-autorest/autorest/to"
-
"github.com/bluesky-social/indigo/atproto/data"
+
"github.com/bluesky-social/indigo/atproto/atdata"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/haileyok/cocoon/internal/helpers"
"github.com/haileyok/cocoon/models"
···
items := []ComAtprotoRepoListRecordsRecordItem{}
for _, r := range records {
-
val, err := data.UnmarshalCBOR(r.Value)
+
val, err := atdata.UnmarshalCBOR(r.Value)
if err != nil {
return err
}
+2 -2
server/handle_repo_list_repos.go
···
Did: r.Did,
Head: c.String(),
Rev: r.Rev,
-
Active: true,
-
Status: nil,
+
Active: r.Active(),
+
Status: r.Status(),
})
}
+50 -8
server/handle_repo_upload_blob.go
···
import (
"bytes"
+
"fmt"
"io"
+
"github.com/aws/aws-sdk-go/aws"
+
"github.com/aws/aws-sdk-go/aws/credentials"
+
"github.com/aws/aws-sdk-go/aws/session"
+
"github.com/aws/aws-sdk-go/service/s3"
"github.com/haileyok/cocoon/internal/helpers"
"github.com/haileyok/cocoon/models"
"github.com/ipfs/go-cid"
···
mime = "application/octet-stream"
}
+
storage := "sqlite"
+
s3Upload := s.s3Config != nil && s.s3Config.BlobstoreEnabled
+
if s3Upload {
+
storage = "s3"
+
}
blob := models.Blob{
Did: urepo.Repo.Did,
RefCount: 0,
CreatedAt: s.repoman.clock.Next().String(),
+
Storage: storage,
}
if err := s.db.Create(&blob, nil).Error; err != nil {
···
read += n
fulldata.Write(data)
-
blobPart := models.BlobPart{
-
BlobID: blob.ID,
-
Idx: part,
-
Data: data,
-
}
+
if !s3Upload {
+
blobPart := models.BlobPart{
+
BlobID: blob.ID,
+
Idx: part,
+
Data: data,
+
}
-
if err := s.db.Create(&blobPart, nil).Error; err != nil {
-
s.logger.Error("error adding blob part to db", "error", err)
-
return helpers.ServerError(e, nil)
+
if err := s.db.Create(&blobPart, nil).Error; err != nil {
+
s.logger.Error("error adding blob part to db", "error", err)
+
return helpers.ServerError(e, nil)
+
}
}
part++
···
if err != nil {
s.logger.Error("error creating cid prefix", "error", err)
return helpers.ServerError(e, nil)
+
}
+
+
if s3Upload {
+
config := &aws.Config{
+
Region: aws.String(s.s3Config.Region),
+
Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""),
+
}
+
+
if s.s3Config.Endpoint != "" {
+
config.Endpoint = aws.String(s.s3Config.Endpoint)
+
config.S3ForcePathStyle = aws.Bool(true)
+
}
+
+
sess, err := session.NewSession(config)
+
if err != nil {
+
s.logger.Error("error creating aws session", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
svc := s3.New(sess)
+
+
if _, err := svc.PutObject(&s3.PutObjectInput{
+
Bucket: aws.String(s.s3Config.Bucket),
+
Key: aws.String(fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String())),
+
Body: bytes.NewReader(fulldata.Bytes()),
+
}); err != nil {
+
s.logger.Error("error uploading blob to s3", "error", err)
+
return helpers.ServerError(e, nil)
+
}
}
if err := s.db.Exec("UPDATE blobs SET cid = ? WHERE id = ?", nil, c.Bytes(), blob.ID).Error; err != nil {
+45
server/handle_server_activate_account.go
···
+
package server
+
+
import (
+
"context"
+
"time"
+
+
"github.com/bluesky-social/indigo/api/atproto"
+
"github.com/bluesky-social/indigo/events"
+
"github.com/bluesky-social/indigo/util"
+
"github.com/haileyok/cocoon/internal/helpers"
+
"github.com/haileyok/cocoon/models"
+
"github.com/labstack/echo/v4"
+
)
+
+
type ComAtprotoServerActivateAccountRequest struct {
+
// NOTE: this implementation will not pay attention to this value
+
DeleteAfter time.Time `json:"deleteAfter"`
+
}
+
+
func (s *Server) handleServerActivateAccount(e echo.Context) error {
+
var req ComAtprotoServerDeactivateAccountRequest
+
if err := e.Bind(&req); err != nil {
+
s.logger.Error("error binding", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
urepo := e.Get("repo").(*models.RepoActor)
+
+
if err := s.db.Exec("UPDATE repos SET deactivated = ? WHERE did = ?", nil, false, urepo.Repo.Did).Error; err != nil {
+
s.logger.Error("error updating account status to deactivated", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
+
RepoAccount: &atproto.SyncSubscribeRepos_Account{
+
Active: true,
+
Did: urepo.Repo.Did,
+
Status: nil,
+
Seq: time.Now().UnixMicro(), // TODO: bad puppy
+
Time: time.Now().Format(util.ISO8601),
+
},
+
})
+
+
return e.NoContent(200)
+
}
+2 -11
server/handle_server_create_account.go
···
"github.com/Azure/go-autorest/autorest/to"
"github.com/bluesky-social/indigo/api/atproto"
-
"github.com/bluesky-social/indigo/atproto/crypto"
+
"github.com/bluesky-social/indigo/atproto/atcrypto"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/repo"
···
// TODO: unsupported domains
-
k, err := crypto.GeneratePrivateKeyK256()
+
k, err := atcrypto.GeneratePrivateKeyK256()
if err != nil {
s.logger.Error("error creating signing key", "endpoint", "com.atproto.server.createAccount", "error", err)
return helpers.ServerError(e, nil)
···
s.logger.Error("error updating repo after commit", "error", err)
return helpers.ServerError(e, nil)
}
-
-
s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
-
RepoHandle: &atproto.SyncSubscribeRepos_Handle{
-
Did: urepo.Did,
-
Handle: request.Handle,
-
Seq: time.Now().UnixMicro(), // TODO: no
-
Time: time.Now().Format(util.ISO8601),
-
},
-
})
s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
RepoIdentity: &atproto.SyncSubscribeRepos_Identity{
+2 -2
server/handle_server_create_session.go
···
Email: repo.Email,
EmailConfirmed: repo.EmailConfirmedAt != nil,
EmailAuthFactor: false,
-
Active: true, // TODO: eventually do takedowns
-
Status: nil, // TODO eventually do takedowns
+
Active: repo.Active(),
+
Status: repo.Status(),
})
}
+46
server/handle_server_deactivate_account.go
···
+
package server
+
+
import (
+
"context"
+
"time"
+
+
"github.com/Azure/go-autorest/autorest/to"
+
"github.com/bluesky-social/indigo/api/atproto"
+
"github.com/bluesky-social/indigo/events"
+
"github.com/bluesky-social/indigo/util"
+
"github.com/haileyok/cocoon/internal/helpers"
+
"github.com/haileyok/cocoon/models"
+
"github.com/labstack/echo/v4"
+
)
+
+
type ComAtprotoServerDeactivateAccountRequest struct {
+
// NOTE: this implementation will not pay attention to this value
+
DeleteAfter time.Time `json:"deleteAfter"`
+
}
+
+
func (s *Server) handleServerDeactivateAccount(e echo.Context) error {
+
var req ComAtprotoServerDeactivateAccountRequest
+
if err := e.Bind(&req); err != nil {
+
s.logger.Error("error binding", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
urepo := e.Get("repo").(*models.RepoActor)
+
+
if err := s.db.Exec("UPDATE repos SET deactivated = ? WHERE did = ?", nil, true, urepo.Repo.Did).Error; err != nil {
+
s.logger.Error("error updating account status to deactivated", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
+
RepoAccount: &atproto.SyncSubscribeRepos_Account{
+
Active: false,
+
Did: urepo.Repo.Did,
+
Status: to.StringPtr("deactivated"),
+
Seq: time.Now().UnixMicro(), // TODO: bad puppy
+
Time: time.Now().Format(util.ISO8601),
+
},
+
})
+
+
return e.NoContent(200)
+
}
+2 -2
server/handle_server_get_session.go
···
Email: repo.Email,
EmailConfirmed: repo.EmailConfirmedAt != nil,
EmailAuthFactor: false, // TODO: todo todo
-
Active: true,
-
Status: nil,
+
Active: repo.Active(),
+
Status: repo.Status(),
})
}
+2 -2
server/handle_server_refresh_session.go
···
RefreshJwt: sess.RefreshToken,
Handle: repo.Handle,
Did: repo.Repo.Did,
-
Active: true,
-
Status: nil,
+
Active: repo.Active(),
+
Status: repo.Status(),
})
}
+79 -8
server/handle_sync_get_blob.go
···
import (
"bytes"
+
"fmt"
+
"io"
+
"github.com/Azure/go-autorest/autorest/to"
+
"github.com/aws/aws-sdk-go/aws"
+
"github.com/aws/aws-sdk-go/aws/credentials"
+
"github.com/aws/aws-sdk-go/aws/session"
+
"github.com/aws/aws-sdk-go/service/s3"
"github.com/haileyok/cocoon/internal/helpers"
"github.com/haileyok/cocoon/models"
"github.com/ipfs/go-cid"
···
return helpers.InputError(e, nil)
}
+
urepo, err := s.getRepoActorByDid(did)
+
if err != nil {
+
s.logger.Error("could not find user for requested blob", "error", err)
+
return helpers.InputError(e, nil)
+
}
+
+
status := urepo.Status()
+
if status != nil {
+
if *status == "deactivated" {
+
return helpers.InputError(e, to.StringPtr("RepoDeactivated"))
+
}
+
}
+
var blob models.Blob
if err := s.db.Raw("SELECT * FROM blobs WHERE did = ? AND cid = ?", nil, did, c.Bytes()).Scan(&blob).Error; err != nil {
s.logger.Error("error looking up blob", "error", err)
···
buf := new(bytes.Buffer)
-
var parts []models.BlobPart
-
if err := s.db.Raw("SELECT * FROM blob_parts WHERE blob_id = ? ORDER BY idx", nil, blob.ID).Scan(&parts).Error; err != nil {
-
s.logger.Error("error getting blob parts", "error", err)
+
if blob.Storage == "sqlite" {
+
var parts []models.BlobPart
+
if err := s.db.Raw("SELECT * FROM blob_parts WHERE blob_id = ? ORDER BY idx", nil, blob.ID).Scan(&parts).Error; err != nil {
+
s.logger.Error("error getting blob parts", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
// TODO: we can just stream this, don't need to make a buffer
+
for _, p := range parts {
+
buf.Write(p.Data)
+
}
+
} else if blob.Storage == "s3" && s.s3Config != nil && s.s3Config.BlobstoreEnabled {
+
config := &aws.Config{
+
Region: aws.String(s.s3Config.Region),
+
Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""),
+
}
+
+
if s.s3Config.Endpoint != "" {
+
config.Endpoint = aws.String(s.s3Config.Endpoint)
+
config.S3ForcePathStyle = aws.Bool(true)
+
}
+
+
sess, err := session.NewSession(config)
+
if err != nil {
+
s.logger.Error("error creating aws session", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
svc := s3.New(sess)
+
if result, err := svc.GetObject(&s3.GetObjectInput{
+
Bucket: aws.String(s.s3Config.Bucket),
+
Key: aws.String(fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String())),
+
}); err != nil {
+
s.logger.Error("error getting blob from s3", "error", err)
+
return helpers.ServerError(e, nil)
+
} else {
+
read := 0
+
part := 0
+
partBuf := make([]byte, 0x10000)
+
+
for {
+
n, err := io.ReadFull(result.Body, partBuf)
+
if err == io.ErrUnexpectedEOF || err == io.EOF {
+
if n == 0 {
+
break
+
}
+
} else if err != nil && err != io.ErrUnexpectedEOF {
+
s.logger.Error("error reading blob", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
data := partBuf[:n]
+
read += n
+
buf.Write(data)
+
part++
+
}
+
}
+
} else {
+
s.logger.Error("unknown storage", "storage", blob.Storage)
return helpers.ServerError(e, nil)
-
}
-
-
// TODO: we can just stream this, don't need to make a buffer
-
for _, p := range parts {
-
buf.Write(p.Data)
}
e.Response().Header().Set(echo.HeaderContentDisposition, "attachment; filename="+c.String())
+2 -2
server/handle_sync_get_repo_status.go
···
return e.JSON(200, ComAtprotoSyncGetRepoStatusResponse{
Did: urepo.Repo.Did,
-
Active: true,
-
Status: nil,
+
Active: urepo.Active(),
+
Status: urepo.Status(),
Rev: &urepo.Rev,
})
}
+14
server/handle_sync_list_blobs.go
···
package server
import (
+
"github.com/Azure/go-autorest/autorest/to"
"github.com/haileyok/cocoon/internal/helpers"
"github.com/haileyok/cocoon/models"
"github.com/ipfs/go-cid"
···
cursorquery = "AND created_at < ?"
}
params = append(params, limit)
+
+
urepo, err := s.getRepoActorByDid(did)
+
if err != nil {
+
s.logger.Error("could not find user for requested blobs", "error", err)
+
return helpers.InputError(e, nil)
+
}
+
+
status := urepo.Status()
+
if status != nil {
+
if *status == "deactivated" {
+
return helpers.InputError(e, to.StringPtr("RepoDeactivated"))
+
}
+
}
var blobs []models.Blob
if err := s.db.Raw("SELECT * FROM blobs WHERE did = ? "+cursorquery+" ORDER BY created_at DESC LIMIT ?", nil, params...).Scan(&blobs).Error; err != nil {
-18
server/handle_sync_subscribe_repos.go
···
import (
"fmt"
-
"net/http"
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/lex/util"
"github.com/btcsuite/websocket"
"github.com/labstack/echo/v4"
)
-
-
var upgrader = websocket.Upgrader{
-
ReadBufferSize: 1024,
-
WriteBufferSize: 1024,
-
CheckOrigin: func(r *http.Request) bool {
-
return true
-
},
-
}
func (s *Server) handleSyncSubscribeRepos(e echo.Context) error {
conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10)
···
case evt.RepoCommit != nil:
header.MsgType = "#commit"
obj = evt.RepoCommit
-
case evt.RepoHandle != nil:
-
header.MsgType = "#handle"
-
obj = evt.RepoHandle
case evt.RepoIdentity != nil:
header.MsgType = "#identity"
obj = evt.RepoIdentity
···
case evt.RepoInfo != nil:
header.MsgType = "#info"
obj = evt.RepoInfo
-
case evt.RepoMigrate != nil:
-
header.MsgType = "#migrate"
-
obj = evt.RepoMigrate
-
case evt.RepoTombstone != nil:
-
header.MsgType = "#tombstone"
-
obj = evt.RepoTombstone
default:
return fmt.Errorf("unrecognized event kind")
}
+8 -1
server/middleware.go
···
import (
"crypto/sha256"
"encoding/base64"
+
"errors"
"fmt"
"strings"
"time"
···
"github.com/golang-jwt/jwt/v4"
"github.com/haileyok/cocoon/internal/helpers"
"github.com/haileyok/cocoon/models"
+
"github.com/haileyok/cocoon/oauth/dpop"
"github.com/haileyok/cocoon/oauth/provider"
"github.com/labstack/echo/v4"
"gitlab.com/yawning/secp256k1-voi"
···
proof, err := s.oauthProvider.DpopManager.CheckProof(e.Request().Method, "https://"+s.config.Hostname+e.Request().URL.String(), e.Request().Header, to.StringPtr(accessToken))
if err != nil {
+
if errors.Is(err, dpop.ErrUseDpopNonce) {
+
return e.JSON(400, map[string]string{
+
"error": "use_dpop_nonce",
+
})
+
}
s.logger.Error("invalid dpop proof", "error", err)
-
return helpers.InputError(e, to.StringPtr(err.Error()))
+
return helpers.InputError(e, nil)
}
var oauthToken provider.OauthToken
+13 -7
server/repo.go
···
"github.com/Azure/go-autorest/autorest/to"
"github.com/bluesky-social/indigo/api/atproto"
-
"github.com/bluesky-social/indigo/atproto/data"
+
"github.com/bluesky-social/indigo/atproto/atdata"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/bluesky-social/indigo/carstore"
"github.com/bluesky-social/indigo/events"
···
}
func (mm *MarshalableMap) MarshalCBOR(w io.Writer) error {
-
data, err := data.MarshalCBOR(*mm)
+
data, err := atdata.MarshalCBOR(*mm)
if err != nil {
return err
}
···
if err != nil {
return nil, err
}
-
out, err := data.UnmarshalJSON(j)
+
out, err := atdata.UnmarshalJSON(j)
if err != nil {
return nil, err
}
mm := MarshalableMap(out)
+
+
// HACK: if a record doesn't contain a $type, we can manually set it here based on the op's collection
+
if mm["$type"] == "" {
+
mm["$type"] = op.Collection
+
}
+
nc, err := r.PutRecord(context.TODO(), op.Collection+"/"+*op.Rkey, &mm)
if err != nil {
return nil, err
}
-
d, err := data.MarshalCBOR(mm)
+
d, err := atdata.MarshalCBOR(mm)
if err != nil {
return nil, err
}
···
if err != nil {
return nil, err
}
-
out, err := data.UnmarshalJSON(j)
+
out, err := atdata.UnmarshalJSON(j)
if err != nil {
return nil, err
}
···
if err != nil {
return nil, err
}
-
d, err := data.MarshalCBOR(mm)
+
d, err := atdata.MarshalCBOR(mm)
if err != nil {
return nil, err
}
···
func getBlobCidsFromCbor(cbor []byte) ([]cid.Cid, error) {
var cids []cid.Cid
-
decoded, err := data.UnmarshalCBOR(cbor)
+
decoded, err := atdata.UnmarshalCBOR(cbor)
if err != nil {
return nil, fmt.Errorf("error unmarshaling cbor: %w", err)
}
+35 -32
server/server.go
···
)
type S3Config struct {
-
BackupsEnabled bool
-
Endpoint string
-
Region string
-
Bucket string
-
AccessKey string
-
SecretKey string
+
BackupsEnabled bool
+
BlobstoreEnabled bool
+
Endpoint string
+
Region string
+
Bucket string
+
AccessKey string
+
SecretKey string
}
type Server struct {
···
oauthProvider *provider.Provider
evtman *events.EventManager
passport *identity.Passport
+
fallbackProxy string
dbName string
s3Config *S3Config
···
SessionSecret string
-
DefaultAtprotoProxy string
-
BlockstoreVariant BlockstoreVariant
+
FallbackProxy string
}
type config struct {
-
Version string
-
Did string
-
Hostname string
-
ContactEmail string
-
EnforcePeering bool
-
Relays []string
-
AdminPassword string
-
SmtpEmail string
-
SmtpName string
-
DefaultAtprotoProxy string
-
BlockstoreVariant BlockstoreVariant
+
Version string
+
Did string
+
Hostname string
+
ContactEmail string
+
EnforcePeering bool
+
Relays []string
+
AdminPassword string
+
SmtpEmail string
+
SmtpName string
+
BlockstoreVariant BlockstoreVariant
+
FallbackProxy string
}
type CustomValidator struct {
···
IdleTimeout: 5 * time.Minute,
}
-
gdb, err := gorm.Open(sqlite.Open("cocoon.db"), &gorm.Config{})
+
gdb, err := gorm.Open(sqlite.Open(args.DbName), &gorm.Config{})
if err != nil {
return nil, err
}
···
plcClient: plcClient,
privateKey: &pkey,
config: &config{
-
Version: args.Version,
-
Did: args.Did,
-
Hostname: args.Hostname,
-
ContactEmail: args.ContactEmail,
-
EnforcePeering: false,
-
Relays: args.Relays,
-
AdminPassword: args.AdminPassword,
-
SmtpName: args.SmtpName,
-
SmtpEmail: args.SmtpEmail,
-
DefaultAtprotoProxy: args.DefaultAtprotoProxy,
-
BlockstoreVariant: args.BlockstoreVariant,
+
Version: args.Version,
+
Did: args.Did,
+
Hostname: args.Hostname,
+
ContactEmail: args.ContactEmail,
+
EnforcePeering: false,
+
Relays: args.Relays,
+
AdminPassword: args.AdminPassword,
+
SmtpName: args.SmtpName,
+
SmtpEmail: args.SmtpEmail,
+
BlockstoreVariant: args.BlockstoreVariant,
+
FallbackProxy: args.FallbackProxy,
},
evtman: events.NewEventManager(events.NewMemPersister()),
passport: identity.NewPassport(h, identity.NewMemCache(10_000)),
···
// TODO: should validate these args
if args.SmtpUser == "" || args.SmtpPass == "" || args.SmtpHost == "" || args.SmtpPort == "" || args.SmtpEmail == "" || args.SmtpName == "" {
-
args.Logger.Warn("not enough smpt args were provided. mailing will not work for your server.")
+
args.Logger.Warn("not enough smtp args were provided. mailing will not work for your server.")
} else {
mail := mailyak.New(args.SmtpHost+":"+args.SmtpPort, smtp.PlainAuth("", args.SmtpUser, args.SmtpPass, args.SmtpHost))
mail.From(s.config.SmtpEmail)
···
s.echo.POST("/xrpc/com.atproto.server.updateEmail", s.handleServerUpdateEmail, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
s.echo.GET("/xrpc/com.atproto.server.getServiceAuth", s.handleServerGetServiceAuth, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
s.echo.GET("/xrpc/com.atproto.server.checkAccountStatus", s.handleServerCheckAccountStatus, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
+
s.echo.POST("/xrpc/com.atproto.server.deactivateAccount", s.handleServerDeactivateAccount, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
+
s.echo.POST("/xrpc/com.atproto.server.activateAccount", s.handleServerActivateAccount, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
// repo
s.echo.POST("/xrpc/com.atproto.repo.createRecord", s.handleCreateRecord, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)