An atproto PDS written in Go
at main 3.6 kB view raw
1package server 2 3import ( 4 "bytes" 5 "fmt" 6 "io" 7 8 "github.com/aws/aws-sdk-go/aws" 9 "github.com/aws/aws-sdk-go/aws/credentials" 10 "github.com/aws/aws-sdk-go/aws/session" 11 "github.com/aws/aws-sdk-go/service/s3" 12 "github.com/haileyok/cocoon/internal/helpers" 13 "github.com/haileyok/cocoon/models" 14 "github.com/ipfs/go-cid" 15 "github.com/labstack/echo/v4" 16 "github.com/multiformats/go-multihash" 17) 18 19const ( 20 blockSize = 0x10000 21) 22 23type ComAtprotoRepoUploadBlobResponse struct { 24 Blob struct { 25 Type string `json:"$type"` 26 Ref struct { 27 Link string `json:"$link"` 28 } `json:"ref"` 29 MimeType string `json:"mimeType"` 30 Size int `json:"size"` 31 } `json:"blob"` 32} 33 34func (s *Server) handleRepoUploadBlob(e echo.Context) error { 35 urepo := e.Get("repo").(*models.RepoActor) 36 37 mime := e.Request().Header.Get("content-type") 38 if mime == "" { 39 mime = "application/octet-stream" 40 } 41 42 storage := "sqlite" 43 s3Upload := s.s3Config != nil && s.s3Config.BlobstoreEnabled 44 if s3Upload { 45 storage = "s3" 46 } 47 blob := models.Blob{ 48 Did: urepo.Repo.Did, 49 RefCount: 0, 50 CreatedAt: s.repoman.clock.Next().String(), 51 Storage: storage, 52 } 53 54 if err := s.db.Create(&blob, nil).Error; err != nil { 55 s.logger.Error("error creating new blob in db", "error", err) 56 return helpers.ServerError(e, nil) 57 } 58 59 read := 0 60 part := 0 61 62 buf := make([]byte, 0x10000) 63 fulldata := new(bytes.Buffer) 64 65 for { 66 n, err := io.ReadFull(e.Request().Body, buf) 67 if err == io.ErrUnexpectedEOF || err == io.EOF { 68 if n == 0 { 69 break 70 } 71 } else if err != nil && err != io.ErrUnexpectedEOF { 72 s.logger.Error("error reading blob", "error", err) 73 return helpers.ServerError(e, nil) 74 } 75 76 data := buf[:n] 77 read += n 78 fulldata.Write(data) 79 80 if !s3Upload { 81 blobPart := models.BlobPart{ 82 BlobID: blob.ID, 83 Idx: part, 84 Data: data, 85 } 86 87 if err := s.db.Create(&blobPart, nil).Error; err != nil { 88 s.logger.Error("error adding blob part to db", "error", err) 89 return helpers.ServerError(e, nil) 90 } 91 } 92 part++ 93 94 if n < blockSize { 95 break 96 } 97 } 98 99 c, err := cid.NewPrefixV1(cid.Raw, multihash.SHA2_256).Sum(fulldata.Bytes()) 100 if err != nil { 101 s.logger.Error("error creating cid prefix", "error", err) 102 return helpers.ServerError(e, nil) 103 } 104 105 if s3Upload { 106 config := &aws.Config{ 107 Region: aws.String(s.s3Config.Region), 108 Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""), 109 } 110 111 if s.s3Config.Endpoint != "" { 112 config.Endpoint = aws.String(s.s3Config.Endpoint) 113 config.S3ForcePathStyle = aws.Bool(true) 114 } 115 116 sess, err := session.NewSession(config) 117 if err != nil { 118 s.logger.Error("error creating aws session", "error", err) 119 return helpers.ServerError(e, nil) 120 } 121 122 svc := s3.New(sess) 123 124 if _, err := svc.PutObject(&s3.PutObjectInput{ 125 Bucket: aws.String(s.s3Config.Bucket), 126 Key: aws.String(fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String())), 127 Body: bytes.NewReader(fulldata.Bytes()), 128 }); err != nil { 129 s.logger.Error("error uploading blob to s3", "error", err) 130 return helpers.ServerError(e, nil) 131 } 132 } 133 134 if err := s.db.Exec("UPDATE blobs SET cid = ? WHERE id = ?", nil, c.Bytes(), blob.ID).Error; err != nil { 135 // there should probably be somme handling here if this fails... 136 s.logger.Error("error updating blob", "error", err) 137 return helpers.ServerError(e, nil) 138 } 139 140 resp := ComAtprotoRepoUploadBlobResponse{} 141 resp.Blob.Type = "blob" 142 resp.Blob.Ref.Link = c.String() 143 resp.Blob.MimeType = mime 144 resp.Blob.Size = read 145 146 return e.JSON(200, resp) 147}