A tool for backing up ATProto related data to S3
1package main
2
3import (
4 "archive/zip"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log/slog"
10 "net/http"
11 "os"
12
13 "github.com/bugsnag/bugsnag-go/v2"
14 "github.com/minio/minio-go/v7"
15)
16
17func backupPDS(ctx context.Context, minioClient *minio.Client, bucketName string) {
18 if os.Getenv("PDS_HOST") == "" || os.Getenv("DID") == "" {
19 slog.Info("PDS_HOST or DID env not set - skipping PDS backup")
20 return
21 }
22
23 err := backupRepo(ctx, minioClient, bucketName)
24 if err != nil {
25 slog.Error("backup repo", "error", err)
26 bugsnag.Notify(err)
27 return
28 }
29
30 err = backupBlobs(ctx, minioClient, bucketName)
31 if err != nil {
32 slog.Error("backup blobs", "error", err)
33 bugsnag.Notify(err)
34 return
35 }
36}
37
38func backupRepo(ctx context.Context, minioClient *minio.Client, bucketName string) error {
39 pdsHost := os.Getenv("PDS_HOST")
40 did := os.Getenv("DID")
41
42 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", pdsHost, did)
43 req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
44 if err != nil {
45 return fmt.Errorf("create get repo request: %w", err)
46 }
47
48 req.Header.Add("ACCEPT", "application/vnd.ipld.car")
49 resp, err := http.DefaultClient.Do(req)
50 if err != nil {
51 return fmt.Errorf("get repo: %w", err)
52 }
53
54 defer resp.Body.Close()
55
56 _, err = minioClient.PutObject(ctx, bucketName, "pds-repo", resp.Body, -1, minio.PutObjectOptions{})
57 if err != nil {
58 return fmt.Errorf("stream repo to bucket: %w", err)
59 }
60
61 return nil
62}
63
64func backupBlobs(ctx context.Context, minioClient *minio.Client, bucketName string) error {
65 cids, err := getAllBlobCIDs(ctx)
66 if err != nil {
67 return fmt.Errorf("get all blob CIDs: %w", err)
68 }
69
70 reader, writer := io.Pipe()
71 defer reader.Close()
72
73 zipWriter := zip.NewWriter(writer)
74
75 go func() {
76 defer writer.Close()
77 defer zipWriter.Close()
78
79 for _, cid := range cids {
80 slog.Info("processing cid", "cid", cid)
81 blob, err := getBlob(ctx, cid)
82 if err != nil {
83 slog.Error("failed to get blob", "cid", cid, "error", err)
84 bugsnag.Notify(err)
85 continue
86 }
87
88 zipFile, err := zipWriter.Create(cid)
89 if err != nil {
90 slog.Error("create new file in zipwriter", "cid", cid, "error", err)
91 bugsnag.Notify(err)
92 blob.Close()
93 continue
94 }
95
96 io.Copy(zipFile, blob)
97 blob.Close()
98 }
99 }()
100
101 _, err = minioClient.PutObject(ctx, bucketName, "pds-blobs.zip", reader, -1, minio.PutObjectOptions{})
102 if err != nil {
103 return fmt.Errorf("stream blobs to bucket: %w", err)
104 }
105
106 return nil
107}
108
109func getAllBlobCIDs(ctx context.Context) ([]string, error) {
110 cursor := ""
111 limit := 100
112 var cids []string
113 for {
114 res, err := listBlobs(ctx, cursor, int64(limit))
115 if err != nil {
116 return nil, fmt.Errorf("list blobs: %w", err)
117 }
118 if len(res.CIDs) == 0 {
119 return cids, nil
120 }
121
122 cids = append(cids, res.CIDs...)
123
124 if len(res.CIDs) < limit {
125 return cids, nil
126 }
127
128 cursor = res.Cursor
129 }
130}
131
132type listBlobsResponse struct {
133 Cursor string `json:"cursor"`
134 CIDs []string `json:"cids"`
135}
136
137func listBlobs(ctx context.Context, cursor string, limit int64) (listBlobsResponse, error) {
138 pdsHost := os.Getenv("PDS_HOST")
139 did := os.Getenv("DID")
140
141 // TODO: do proper url encoding of query params
142 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listBlobs?did=%s&cursor=%s&limit=%d", pdsHost, did, cursor, limit)
143 req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
144 if err != nil {
145 return listBlobsResponse{}, fmt.Errorf("create list blobs request: %w", err)
146 }
147
148 resp, err := http.DefaultClient.Do(req)
149 if err != nil {
150 return listBlobsResponse{}, fmt.Errorf("list blobs: %w", err)
151 }
152
153 defer resp.Body.Close()
154
155 resBody, err := io.ReadAll(resp.Body)
156 if err != nil {
157 return listBlobsResponse{}, fmt.Errorf("failed to read response: %w", err)
158 }
159
160 var result listBlobsResponse
161 err = json.Unmarshal(resBody, &result)
162 if err != nil {
163 return listBlobsResponse{}, fmt.Errorf("failed to unmarshal response: %w", err)
164 }
165
166 return result, nil
167}
168
169func getBlob(ctx context.Context, cid string) (io.ReadCloser, error) {
170 pdsHost := os.Getenv("PDS_HOST")
171 did := os.Getenv("DID")
172
173 // TODO: do proper url encoding of query params
174 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", pdsHost, did, cid)
175 req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
176 if err != nil {
177 return nil, fmt.Errorf("create get blob request: %w", err)
178 }
179
180 resp, err := http.DefaultClient.Do(req)
181 if err != nil {
182 return nil, fmt.Errorf("get blob: %w", err)
183 }
184
185 return resp.Body, nil
186}