A tool for backing up ATProto related data to S3
1package main 2 3import ( 4 "archive/zip" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log/slog" 10 "net/http" 11 "os" 12 13 "github.com/joho/godotenv" 14 "github.com/minio/minio-go/v7" 15 "github.com/minio/minio-go/v7/pkg/credentials" 16) 17 18func main() { 19 ctx := context.Background() 20 21 err := godotenv.Load(".env") 22 if err != nil { 23 if !os.IsNotExist(err) { 24 slog.Error("load env", "error", err) 25 return 26 } 27 } 28 29 minioClient, err := createMinioClient() 30 if err != nil { 31 slog.Error("create minio client", "error", err) 32 return 33 } 34 35 bucketName := os.Getenv("BUCKET_NAME") 36 37 err = minioClient.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{}) 38 if err != nil { 39 slog.Error("create bucket", "error", err) 40 return 41 } 42 43 err = backupRepo(ctx, minioClient, bucketName) 44 if err != nil { 45 slog.Error("backup repo", "error", err) 46 return 47 } 48 49 err = backupBlobs(ctx, minioClient, bucketName) 50 if err != nil { 51 slog.Error("backup blobs", "error", err) 52 return 53 } 54} 55 56func createMinioClient() (*minio.Client, error) { 57 endpoint := os.Getenv("ENDPOINT") 58 accessKeyID := os.Getenv("ACCESS_ID") 59 secretAccessKey := os.Getenv("SECRET_ACCESS_KEY") 60 useSSL := true 61 62 return minio.New(endpoint, &minio.Options{ 63 Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""), 64 Secure: useSSL, 65 }) 66} 67 68func backupRepo(ctx context.Context, minioClient *minio.Client, bucketName string) error { 69 pdsHost := os.Getenv("PDS_HOST") 70 did := os.Getenv("DID") 71 72 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", pdsHost, did) 73 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 74 if err != nil { 75 return fmt.Errorf("create get repo request: %w", err) 76 } 77 78 req.Header.Add("ACCEPT", "application/vnd.ipld.car") 79 resp, err := http.DefaultClient.Do(req) 80 if err != nil { 81 return fmt.Errorf("get repo: %w", err) 82 } 83 84 defer resp.Body.Close() 85 86 _, err = minioClient.PutObject(ctx, bucketName, "pds-repo", resp.Body, -1, minio.PutObjectOptions{}) 87 if err != nil { 88 return fmt.Errorf("stream repo to bucket: %w", err) 89 } 90 91 return nil 92} 93 94func backupBlobs(ctx context.Context, minioClient *minio.Client, bucketName string) error { 95 cids, err := getAllBlobCIDs(ctx) 96 if err != nil { 97 return fmt.Errorf("get all blob CIDs: %w", err) 98 } 99 100 reader, writer := io.Pipe() 101 defer reader.Close() 102 103 zipWriter := zip.NewWriter(writer) 104 105 go func() { 106 defer writer.Close() 107 defer zipWriter.Close() 108 109 for _, cid := range cids { 110 slog.Info("processing cid", "cid", cid) 111 blob, err := getBlob(ctx, cid) 112 if err != nil { 113 slog.Error("failed to get blob", "cid", cid, "error", err) 114 continue 115 } 116 117 zipFile, err := zipWriter.Create(cid) 118 if err != nil { 119 slog.Error("create new file in zipwriter", "cid", cid, "error", err) 120 blob.Close() 121 continue 122 } 123 124 io.Copy(zipFile, blob) 125 blob.Close() 126 } 127 }() 128 129 _, err = minioClient.PutObject(ctx, bucketName, "pds-blobs.zip", reader, -1, minio.PutObjectOptions{}) 130 if err != nil { 131 return fmt.Errorf("stream blobs to bucket: %w", err) 132 } 133 134 return nil 135} 136 137func getAllBlobCIDs(ctx context.Context) ([]string, error) { 138 cursor := "" 139 limit := 100 140 var cids []string 141 for { 142 res, err := listBlobs(ctx, cursor, int64(limit)) 143 if err != nil { 144 return nil, fmt.Errorf("list blobs: %w", err) 145 } 146 if len(res.CIDs) == 0 { 147 return cids, nil 148 } 149 150 cids = append(cids, res.CIDs...) 151 152 if len(res.CIDs) < limit { 153 return cids, nil 154 } 155 156 cursor = res.Cursor 157 } 158} 159 160type listBlobsResponse struct { 161 Cursor string `json:"cursor"` 162 CIDs []string `json:"cids"` 163} 164 165func listBlobs(ctx context.Context, cursor string, limit int64) (listBlobsResponse, error) { 166 pdsHost := os.Getenv("PDS_HOST") 167 did := os.Getenv("DID") 168 169 // TODO: do proper url encoding of query params 170 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listBlobs?did=%s&cursor=%s&limit=%d", pdsHost, did, cursor, limit) 171 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 172 if err != nil { 173 return listBlobsResponse{}, fmt.Errorf("create list blobs request: %w", err) 174 } 175 176 resp, err := http.DefaultClient.Do(req) 177 if err != nil { 178 return listBlobsResponse{}, fmt.Errorf("list blobs: %w", err) 179 } 180 181 defer resp.Body.Close() 182 183 resBody, err := io.ReadAll(resp.Body) 184 if err != nil { 185 return listBlobsResponse{}, fmt.Errorf("failed to read response: %w", err) 186 } 187 188 var result listBlobsResponse 189 err = json.Unmarshal(resBody, &result) 190 if err != nil { 191 return listBlobsResponse{}, fmt.Errorf("failed to unmarshal response: %w", err) 192 } 193 194 return result, nil 195} 196 197func getBlob(ctx context.Context, cid string) (io.ReadCloser, error) { 198 pdsHost := os.Getenv("PDS_HOST") 199 did := os.Getenv("DID") 200 201 // TODO: do proper url encoding of query params 202 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", pdsHost, did, cid) 203 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 204 if err != nil { 205 return nil, fmt.Errorf("create get blob request: %w", err) 206 } 207 208 resp, err := http.DefaultClient.Do(req) 209 if err != nil { 210 return nil, fmt.Errorf("get blob: %w", err) 211 } 212 213 return resp.Body, nil 214}