package main import ( "archive/zip" "context" "encoding/json" "fmt" "io" "log/slog" "net/http" "os" "github.com/bugsnag/bugsnag-go/v2" "github.com/minio/minio-go/v7" ) func backupPDS(ctx context.Context, minioClient *minio.Client, bucketName string) { if os.Getenv("PDS_HOST") == "" || os.Getenv("DID") == "" { slog.Info("PDS_HOST or DID env not set - skipping PDS backup") return } err := backupRepo(ctx, minioClient, bucketName) if err != nil { slog.Error("backup repo", "error", err) bugsnag.Notify(err) return } err = backupBlobs(ctx, minioClient, bucketName) if err != nil { slog.Error("backup blobs", "error", err) bugsnag.Notify(err) return } } func backupRepo(ctx context.Context, minioClient *minio.Client, bucketName string) error { pdsHost := os.Getenv("PDS_HOST") did := os.Getenv("DID") url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", pdsHost, did) req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return fmt.Errorf("create get repo request: %w", err) } req.Header.Add("ACCEPT", "application/vnd.ipld.car") resp, err := http.DefaultClient.Do(req) if err != nil { return fmt.Errorf("get repo: %w", err) } defer resp.Body.Close() _, err = minioClient.PutObject(ctx, bucketName, "pds-repo", resp.Body, -1, minio.PutObjectOptions{}) if err != nil { return fmt.Errorf("stream repo to bucket: %w", err) } return nil } func backupBlobs(ctx context.Context, minioClient *minio.Client, bucketName string) error { cids, err := getAllBlobCIDs(ctx) if err != nil { return fmt.Errorf("get all blob CIDs: %w", err) } reader, writer := io.Pipe() defer reader.Close() zipWriter := zip.NewWriter(writer) go func() { defer writer.Close() defer zipWriter.Close() for _, cid := range cids { slog.Info("processing cid", "cid", cid) blob, err := getBlob(ctx, cid) if err != nil { slog.Error("failed to get blob", "cid", cid, "error", err) bugsnag.Notify(err) continue } zipFile, err := zipWriter.Create(cid) if err != nil { slog.Error("create new file in zipwriter", "cid", cid, "error", err) bugsnag.Notify(err) blob.Close() continue } io.Copy(zipFile, blob) blob.Close() } }() _, err = minioClient.PutObject(ctx, bucketName, "pds-blobs.zip", reader, -1, minio.PutObjectOptions{}) if err != nil { return fmt.Errorf("stream blobs to bucket: %w", err) } return nil } func getAllBlobCIDs(ctx context.Context) ([]string, error) { cursor := "" limit := 100 var cids []string for { res, err := listBlobs(ctx, cursor, int64(limit)) if err != nil { return nil, fmt.Errorf("list blobs: %w", err) } if len(res.CIDs) == 0 { return cids, nil } cids = append(cids, res.CIDs...) if len(res.CIDs) < limit { return cids, nil } cursor = res.Cursor } } type listBlobsResponse struct { Cursor string `json:"cursor"` CIDs []string `json:"cids"` } func listBlobs(ctx context.Context, cursor string, limit int64) (listBlobsResponse, error) { pdsHost := os.Getenv("PDS_HOST") did := os.Getenv("DID") // TODO: do proper url encoding of query params url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listBlobs?did=%s&cursor=%s&limit=%d", pdsHost, did, cursor, limit) req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return listBlobsResponse{}, fmt.Errorf("create list blobs request: %w", err) } resp, err := http.DefaultClient.Do(req) if err != nil { return listBlobsResponse{}, fmt.Errorf("list blobs: %w", err) } defer resp.Body.Close() resBody, err := io.ReadAll(resp.Body) if err != nil { return listBlobsResponse{}, fmt.Errorf("failed to read response: %w", err) } var result listBlobsResponse err = json.Unmarshal(resBody, &result) if err != nil { return listBlobsResponse{}, fmt.Errorf("failed to unmarshal response: %w", err) } return result, nil } func getBlob(ctx context.Context, cid string) (io.ReadCloser, error) { pdsHost := os.Getenv("PDS_HOST") did := os.Getenv("DID") // TODO: do proper url encoding of query params url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", pdsHost, did, cid) req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return nil, fmt.Errorf("create get blob request: %w", err) } resp, err := http.DefaultClient.Do(req) if err != nil { return nil, fmt.Errorf("get blob: %w", err) } return resp.Body, nil }