A tool for backing up ATProto related data to S3
at main 4.5 kB view raw
1package main 2 3import ( 4 "archive/zip" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log/slog" 10 "net/http" 11 "os" 12 13 "github.com/bugsnag/bugsnag-go/v2" 14 "github.com/minio/minio-go/v7" 15) 16 17func backupPDS(ctx context.Context, minioClient *minio.Client, bucketName string) { 18 if os.Getenv("PDS_HOST") == "" || os.Getenv("DID") == "" { 19 slog.Info("PDS_HOST or DID env not set - skipping PDS backup") 20 return 21 } 22 23 err := backupRepo(ctx, minioClient, bucketName) 24 if err != nil { 25 slog.Error("backup repo", "error", err) 26 bugsnag.Notify(err) 27 return 28 } 29 30 err = backupBlobs(ctx, minioClient, bucketName) 31 if err != nil { 32 slog.Error("backup blobs", "error", err) 33 bugsnag.Notify(err) 34 return 35 } 36} 37 38func backupRepo(ctx context.Context, minioClient *minio.Client, bucketName string) error { 39 pdsHost := os.Getenv("PDS_HOST") 40 did := os.Getenv("DID") 41 42 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", pdsHost, did) 43 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 44 if err != nil { 45 return fmt.Errorf("create get repo request: %w", err) 46 } 47 48 req.Header.Add("ACCEPT", "application/vnd.ipld.car") 49 resp, err := http.DefaultClient.Do(req) 50 if err != nil { 51 return fmt.Errorf("get repo: %w", err) 52 } 53 54 defer resp.Body.Close() 55 56 _, err = minioClient.PutObject(ctx, bucketName, "pds-repo", resp.Body, -1, minio.PutObjectOptions{}) 57 if err != nil { 58 return fmt.Errorf("stream repo to bucket: %w", err) 59 } 60 61 return nil 62} 63 64func backupBlobs(ctx context.Context, minioClient *minio.Client, bucketName string) error { 65 cids, err := getAllBlobCIDs(ctx) 66 if err != nil { 67 return fmt.Errorf("get all blob CIDs: %w", err) 68 } 69 70 reader, writer := io.Pipe() 71 defer reader.Close() 72 73 zipWriter := zip.NewWriter(writer) 74 75 go func() { 76 defer writer.Close() 77 defer zipWriter.Close() 78 79 for _, cid := range cids { 80 slog.Info("processing cid", "cid", cid) 81 blob, err := getBlob(ctx, cid) 82 if err != nil { 83 slog.Error("failed to get blob", "cid", cid, "error", err) 84 bugsnag.Notify(err) 85 continue 86 } 87 88 zipFile, err := zipWriter.Create(cid) 89 if err != nil { 90 slog.Error("create new file in zipwriter", "cid", cid, "error", err) 91 bugsnag.Notify(err) 92 blob.Close() 93 continue 94 } 95 96 io.Copy(zipFile, blob) 97 blob.Close() 98 } 99 }() 100 101 _, err = minioClient.PutObject(ctx, bucketName, "pds-blobs.zip", reader, -1, minio.PutObjectOptions{}) 102 if err != nil { 103 return fmt.Errorf("stream blobs to bucket: %w", err) 104 } 105 106 return nil 107} 108 109func getAllBlobCIDs(ctx context.Context) ([]string, error) { 110 cursor := "" 111 limit := 100 112 var cids []string 113 for { 114 res, err := listBlobs(ctx, cursor, int64(limit)) 115 if err != nil { 116 return nil, fmt.Errorf("list blobs: %w", err) 117 } 118 if len(res.CIDs) == 0 { 119 return cids, nil 120 } 121 122 cids = append(cids, res.CIDs...) 123 124 if len(res.CIDs) < limit { 125 return cids, nil 126 } 127 128 cursor = res.Cursor 129 } 130} 131 132type listBlobsResponse struct { 133 Cursor string `json:"cursor"` 134 CIDs []string `json:"cids"` 135} 136 137func listBlobs(ctx context.Context, cursor string, limit int64) (listBlobsResponse, error) { 138 pdsHost := os.Getenv("PDS_HOST") 139 did := os.Getenv("DID") 140 141 // TODO: do proper url encoding of query params 142 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listBlobs?did=%s&cursor=%s&limit=%d", pdsHost, did, cursor, limit) 143 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 144 if err != nil { 145 return listBlobsResponse{}, fmt.Errorf("create list blobs request: %w", err) 146 } 147 148 resp, err := http.DefaultClient.Do(req) 149 if err != nil { 150 return listBlobsResponse{}, fmt.Errorf("list blobs: %w", err) 151 } 152 153 defer resp.Body.Close() 154 155 resBody, err := io.ReadAll(resp.Body) 156 if err != nil { 157 return listBlobsResponse{}, fmt.Errorf("failed to read response: %w", err) 158 } 159 160 var result listBlobsResponse 161 err = json.Unmarshal(resBody, &result) 162 if err != nil { 163 return listBlobsResponse{}, fmt.Errorf("failed to unmarshal response: %w", err) 164 } 165 166 return result, nil 167} 168 169func getBlob(ctx context.Context, cid string) (io.ReadCloser, error) { 170 pdsHost := os.Getenv("PDS_HOST") 171 did := os.Getenv("DID") 172 173 // TODO: do proper url encoding of query params 174 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", pdsHost, did, cid) 175 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 176 if err != nil { 177 return nil, fmt.Errorf("create get blob request: %w", err) 178 } 179 180 resp, err := http.DefaultClient.Do(req) 181 if err != nil { 182 return nil, fmt.Errorf("get blob: %w", err) 183 } 184 185 return resp.Body, nil 186}