An atproto PDS written in Go
at main 2.7 kB view raw
1package server 2 3import ( 4 "fmt" 5 "strconv" 6 7 "github.com/bluesky-social/indigo/atproto/atdata" 8 "github.com/haileyok/cocoon/internal/helpers" 9 "github.com/haileyok/cocoon/models" 10 "github.com/ipfs/go-cid" 11 "github.com/labstack/echo/v4" 12) 13 14type ComAtprotoRepoListMissingBlobsResponse struct { 15 Cursor *string `json:"cursor,omitempty"` 16 Blobs []ComAtprotoRepoListMissingBlobsRecordBlob `json:"blobs"` 17} 18 19type ComAtprotoRepoListMissingBlobsRecordBlob struct { 20 Cid string `json:"cid"` 21 RecordUri string `json:"recordUri"` 22} 23 24func (s *Server) handleListMissingBlobs(e echo.Context) error { 25 urepo := e.Get("repo").(*models.RepoActor) 26 27 limitStr := e.QueryParam("limit") 28 cursor := e.QueryParam("cursor") 29 30 limit := 500 31 if limitStr != "" { 32 if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 1000 { 33 limit = l 34 } 35 } 36 37 var records []models.Record 38 if err := s.db.Raw("SELECT * FROM records WHERE did = ?", nil, urepo.Repo.Did).Scan(&records).Error; err != nil { 39 s.logger.Error("failed to get records for listMissingBlobs", "error", err) 40 return helpers.ServerError(e, nil) 41 } 42 43 type blobRef struct { 44 cid cid.Cid 45 recordUri string 46 } 47 var allBlobRefs []blobRef 48 49 for _, rec := range records { 50 blobs := getBlobsFromRecord(rec.Value) 51 recordUri := fmt.Sprintf("at://%s/%s/%s", urepo.Repo.Did, rec.Nsid, rec.Rkey) 52 for _, b := range blobs { 53 allBlobRefs = append(allBlobRefs, blobRef{cid: cid.Cid(b.Ref), recordUri: recordUri}) 54 } 55 } 56 57 missingBlobs := make([]ComAtprotoRepoListMissingBlobsRecordBlob, 0) 58 seenCids := make(map[string]bool) 59 60 for _, ref := range allBlobRefs { 61 cidStr := ref.cid.String() 62 63 if seenCids[cidStr] { 64 continue 65 } 66 67 if cursor != "" && cidStr <= cursor { 68 continue 69 } 70 71 var count int64 72 if err := s.db.Raw("SELECT COUNT(*) FROM blobs WHERE did = ? AND cid = ?", nil, urepo.Repo.Did, ref.cid.Bytes()).Scan(&count).Error; err != nil { 73 continue 74 } 75 76 if count == 0 { 77 missingBlobs = append(missingBlobs, ComAtprotoRepoListMissingBlobsRecordBlob{ 78 Cid: cidStr, 79 RecordUri: ref.recordUri, 80 }) 81 seenCids[cidStr] = true 82 83 if len(missingBlobs) >= limit { 84 break 85 } 86 } 87 } 88 89 var nextCursor *string 90 if len(missingBlobs) > 0 && len(missingBlobs) >= limit { 91 lastCid := missingBlobs[len(missingBlobs)-1].Cid 92 nextCursor = &lastCid 93 } 94 95 return e.JSON(200, ComAtprotoRepoListMissingBlobsResponse{ 96 Cursor: nextCursor, 97 Blobs: missingBlobs, 98 }) 99} 100 101func getBlobsFromRecord(data []byte) []atdata.Blob { 102 if len(data) == 0 { 103 return nil 104 } 105 106 decoded, err := atdata.UnmarshalCBOR(data) 107 if err != nil { 108 return nil 109 } 110 111 return atdata.ExtractBlobs(decoded) 112}