An atproto PDS written in Go
at main 3.6 kB view raw
1package server 2 3import ( 4 "bytes" 5 "fmt" 6 "io" 7 8 "github.com/Azure/go-autorest/autorest/to" 9 "github.com/aws/aws-sdk-go/aws" 10 "github.com/aws/aws-sdk-go/aws/credentials" 11 "github.com/aws/aws-sdk-go/aws/session" 12 "github.com/aws/aws-sdk-go/service/s3" 13 "github.com/haileyok/cocoon/internal/helpers" 14 "github.com/haileyok/cocoon/models" 15 "github.com/ipfs/go-cid" 16 "github.com/labstack/echo/v4" 17) 18 19func (s *Server) handleSyncGetBlob(e echo.Context) error { 20 did := e.QueryParam("did") 21 if did == "" { 22 return helpers.InputError(e, nil) 23 } 24 25 cstr := e.QueryParam("cid") 26 if cstr == "" { 27 return helpers.InputError(e, nil) 28 } 29 30 c, err := cid.Parse(cstr) 31 if err != nil { 32 return helpers.InputError(e, nil) 33 } 34 35 urepo, err := s.getRepoActorByDid(did) 36 if err != nil { 37 s.logger.Error("could not find user for requested blob", "error", err) 38 return helpers.InputError(e, nil) 39 } 40 41 status := urepo.Status() 42 if status != nil { 43 if *status == "deactivated" { 44 return helpers.InputError(e, to.StringPtr("RepoDeactivated")) 45 } 46 } 47 48 var blob models.Blob 49 if err := s.db.Raw("SELECT * FROM blobs WHERE did = ? AND cid = ?", nil, did, c.Bytes()).Scan(&blob).Error; err != nil { 50 s.logger.Error("error looking up blob", "error", err) 51 return helpers.ServerError(e, nil) 52 } 53 54 buf := new(bytes.Buffer) 55 56 if blob.Storage == "sqlite" { 57 var parts []models.BlobPart 58 if err := s.db.Raw("SELECT * FROM blob_parts WHERE blob_id = ? ORDER BY idx", nil, blob.ID).Scan(&parts).Error; err != nil { 59 s.logger.Error("error getting blob parts", "error", err) 60 return helpers.ServerError(e, nil) 61 } 62 63 // TODO: we can just stream this, don't need to make a buffer 64 for _, p := range parts { 65 buf.Write(p.Data) 66 } 67 } else if blob.Storage == "s3" { 68 if !(s.s3Config != nil && s.s3Config.BlobstoreEnabled) { 69 s.logger.Error("s3 storage disabled") 70 return helpers.ServerError(e, nil) 71 } 72 73 blobKey := fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String()) 74 75 if s.s3Config.CDNUrl != "" { 76 redirectUrl := fmt.Sprintf("%s/%s", s.s3Config.CDNUrl, blobKey) 77 return e.Redirect(302, redirectUrl) 78 } 79 80 config := &aws.Config{ 81 Region: aws.String(s.s3Config.Region), 82 Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""), 83 } 84 85 if s.s3Config.Endpoint != "" { 86 config.Endpoint = aws.String(s.s3Config.Endpoint) 87 config.S3ForcePathStyle = aws.Bool(true) 88 } 89 90 sess, err := session.NewSession(config) 91 if err != nil { 92 s.logger.Error("error creating aws session", "error", err) 93 return helpers.ServerError(e, nil) 94 } 95 96 svc := s3.New(sess) 97 if result, err := svc.GetObject(&s3.GetObjectInput{ 98 Bucket: aws.String(s.s3Config.Bucket), 99 Key: aws.String(blobKey), 100 }); err != nil { 101 s.logger.Error("error getting blob from s3", "error", err) 102 return helpers.ServerError(e, nil) 103 } else { 104 read := 0 105 part := 0 106 partBuf := make([]byte, 0x10000) 107 108 for { 109 n, err := io.ReadFull(result.Body, partBuf) 110 if err == io.ErrUnexpectedEOF || err == io.EOF { 111 if n == 0 { 112 break 113 } 114 } else if err != nil && err != io.ErrUnexpectedEOF { 115 s.logger.Error("error reading blob", "error", err) 116 return helpers.ServerError(e, nil) 117 } 118 119 data := partBuf[:n] 120 read += n 121 buf.Write(data) 122 part++ 123 } 124 } 125 } else { 126 s.logger.Error("unknown storage", "storage", blob.Storage) 127 return helpers.ServerError(e, nil) 128 } 129 130 e.Response().Header().Set(echo.HeaderContentDisposition, "attachment; filename="+c.String()) 131 132 return e.Stream(200, "application/octet-stream", buf) 133}