A tool for backing up ATProto related data to S3
1package main
2
3import (
4 "archive/zip"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log/slog"
10 "net/http"
11 "os"
12
13 "github.com/joho/godotenv"
14 "github.com/minio/minio-go/v7"
15 "github.com/minio/minio-go/v7/pkg/credentials"
16)
17
18func main() {
19 ctx := context.Background()
20
21 err := godotenv.Load(".env")
22 if err != nil {
23 if !os.IsNotExist(err) {
24 slog.Error("load env", "error", err)
25 return
26 }
27 }
28
29 minioClient, err := createMinioClient()
30 if err != nil {
31 slog.Error("create minio client", "error", err)
32 return
33 }
34
35 bucketName := os.Getenv("BUCKET_NAME")
36
37 err = minioClient.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{})
38 if err != nil {
39 slog.Error("create bucket", "error", err)
40 return
41 }
42
43 err = backupRepo(ctx, minioClient, bucketName)
44 if err != nil {
45 slog.Error("backup repo", "error", err)
46 return
47 }
48
49 err = backupBlobs(ctx, minioClient, bucketName)
50 if err != nil {
51 slog.Error("backup blobs", "error", err)
52 return
53 }
54}
55
56func createMinioClient() (*minio.Client, error) {
57 endpoint := os.Getenv("ENDPOINT")
58 accessKeyID := os.Getenv("ACCESS_ID")
59 secretAccessKey := os.Getenv("SECRET_ACCESS_KEY")
60 useSSL := true
61
62 return minio.New(endpoint, &minio.Options{
63 Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""),
64 Secure: useSSL,
65 })
66}
67
68func backupRepo(ctx context.Context, minioClient *minio.Client, bucketName string) error {
69 pdsHost := os.Getenv("PDS_HOST")
70 did := os.Getenv("DID")
71
72 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", pdsHost, did)
73 req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
74 if err != nil {
75 return fmt.Errorf("create get repo request: %w", err)
76 }
77
78 req.Header.Add("ACCEPT", "application/vnd.ipld.car")
79 resp, err := http.DefaultClient.Do(req)
80 if err != nil {
81 return fmt.Errorf("get repo: %w", err)
82 }
83
84 defer resp.Body.Close()
85
86 _, err = minioClient.PutObject(ctx, bucketName, "pds-repo", resp.Body, -1, minio.PutObjectOptions{})
87 if err != nil {
88 return fmt.Errorf("stream repo to bucket: %w", err)
89 }
90
91 return nil
92}
93
94func backupBlobs(ctx context.Context, minioClient *minio.Client, bucketName string) error {
95 cids, err := getAllBlobCIDs(ctx)
96 if err != nil {
97 return fmt.Errorf("get all blob CIDs: %w", err)
98 }
99
100 reader, writer := io.Pipe()
101 defer reader.Close()
102
103 zipWriter := zip.NewWriter(writer)
104
105 go func() {
106 defer writer.Close()
107 defer zipWriter.Close()
108
109 for _, cid := range cids {
110 slog.Info("processing cid", "cid", cid)
111 blob, err := getBlob(ctx, cid)
112 if err != nil {
113 slog.Error("failed to get blob", "cid", cid, "error", err)
114 continue
115 }
116
117 zipFile, err := zipWriter.Create(cid)
118 if err != nil {
119 slog.Error("create new file in zipwriter", "cid", cid, "error", err)
120 blob.Close()
121 continue
122 }
123
124 io.Copy(zipFile, blob)
125 blob.Close()
126 }
127 }()
128
129 _, err = minioClient.PutObject(ctx, bucketName, "pds-blobs.zip", reader, -1, minio.PutObjectOptions{})
130 if err != nil {
131 return fmt.Errorf("stream blobs to bucket: %w", err)
132 }
133
134 return nil
135}
136
137func getAllBlobCIDs(ctx context.Context) ([]string, error) {
138 cursor := ""
139 limit := 100
140 var cids []string
141 for {
142 res, err := listBlobs(ctx, cursor, int64(limit))
143 if err != nil {
144 return nil, fmt.Errorf("list blobs: %w", err)
145 }
146 if len(res.CIDs) == 0 {
147 return cids, nil
148 }
149
150 cids = append(cids, res.CIDs...)
151
152 if len(res.CIDs) < limit {
153 return cids, nil
154 }
155
156 cursor = res.Cursor
157 }
158}
159
160type listBlobsResponse struct {
161 Cursor string `json:"cursor"`
162 CIDs []string `json:"cids"`
163}
164
165func listBlobs(ctx context.Context, cursor string, limit int64) (listBlobsResponse, error) {
166 pdsHost := os.Getenv("PDS_HOST")
167 did := os.Getenv("DID")
168
169 // TODO: do proper url encoding of query params
170 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listBlobs?did=%s&cursor=%s&limit=%d", pdsHost, did, cursor, limit)
171 req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
172 if err != nil {
173 return listBlobsResponse{}, fmt.Errorf("create list blobs request: %w", err)
174 }
175
176 resp, err := http.DefaultClient.Do(req)
177 if err != nil {
178 return listBlobsResponse{}, fmt.Errorf("list blobs: %w", err)
179 }
180
181 defer resp.Body.Close()
182
183 resBody, err := io.ReadAll(resp.Body)
184 if err != nil {
185 return listBlobsResponse{}, fmt.Errorf("failed to read response: %w", err)
186 }
187
188 var result listBlobsResponse
189 err = json.Unmarshal(resBody, &result)
190 if err != nil {
191 return listBlobsResponse{}, fmt.Errorf("failed to unmarshal response: %w", err)
192 }
193
194 return result, nil
195}
196
197func getBlob(ctx context.Context, cid string) (io.ReadCloser, error) {
198 pdsHost := os.Getenv("PDS_HOST")
199 did := os.Getenv("DID")
200
201 // TODO: do proper url encoding of query params
202 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", pdsHost, did, cid)
203 req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
204 if err != nil {
205 return nil, fmt.Errorf("create get blob request: %w", err)
206 }
207
208 resp, err := http.DefaultClient.Do(req)
209 if err != nil {
210 return nil, fmt.Errorf("get blob: %w", err)
211 }
212
213 return resp.Body, nil
214}