1package server
2
3import (
4 "bytes"
5 "fmt"
6 "io"
7
8 "github.com/aws/aws-sdk-go/aws"
9 "github.com/aws/aws-sdk-go/aws/credentials"
10 "github.com/aws/aws-sdk-go/aws/session"
11 "github.com/aws/aws-sdk-go/service/s3"
12 "github.com/haileyok/cocoon/internal/helpers"
13 "github.com/haileyok/cocoon/models"
14 "github.com/ipfs/go-cid"
15 "github.com/labstack/echo/v4"
16 "github.com/multiformats/go-multihash"
17)
18
19const (
20 blockSize = 0x10000
21)
22
23type ComAtprotoRepoUploadBlobResponse struct {
24 Blob struct {
25 Type string `json:"$type"`
26 Ref struct {
27 Link string `json:"$link"`
28 } `json:"ref"`
29 MimeType string `json:"mimeType"`
30 Size int `json:"size"`
31 } `json:"blob"`
32}
33
34func (s *Server) handleRepoUploadBlob(e echo.Context) error {
35 urepo := e.Get("repo").(*models.RepoActor)
36
37 mime := e.Request().Header.Get("content-type")
38 if mime == "" {
39 mime = "application/octet-stream"
40 }
41
42 storage := "sqlite"
43 s3Upload := s.s3Config != nil && s.s3Config.BlobstoreEnabled
44 if s3Upload {
45 storage = "s3"
46 }
47 blob := models.Blob{
48 Did: urepo.Repo.Did,
49 RefCount: 0,
50 CreatedAt: s.repoman.clock.Next().String(),
51 Storage: storage,
52 }
53
54 if err := s.db.Create(&blob, nil).Error; err != nil {
55 s.logger.Error("error creating new blob in db", "error", err)
56 return helpers.ServerError(e, nil)
57 }
58
59 read := 0
60 part := 0
61
62 buf := make([]byte, 0x10000)
63 fulldata := new(bytes.Buffer)
64
65 for {
66 n, err := io.ReadFull(e.Request().Body, buf)
67 if err == io.ErrUnexpectedEOF || err == io.EOF {
68 if n == 0 {
69 break
70 }
71 } else if err != nil && err != io.ErrUnexpectedEOF {
72 s.logger.Error("error reading blob", "error", err)
73 return helpers.ServerError(e, nil)
74 }
75
76 data := buf[:n]
77 read += n
78 fulldata.Write(data)
79
80 if !s3Upload {
81 blobPart := models.BlobPart{
82 BlobID: blob.ID,
83 Idx: part,
84 Data: data,
85 }
86
87 if err := s.db.Create(&blobPart, nil).Error; err != nil {
88 s.logger.Error("error adding blob part to db", "error", err)
89 return helpers.ServerError(e, nil)
90 }
91 }
92 part++
93
94 if n < blockSize {
95 break
96 }
97 }
98
99 c, err := cid.NewPrefixV1(cid.Raw, multihash.SHA2_256).Sum(fulldata.Bytes())
100 if err != nil {
101 s.logger.Error("error creating cid prefix", "error", err)
102 return helpers.ServerError(e, nil)
103 }
104
105 if s3Upload {
106 config := &aws.Config{
107 Region: aws.String(s.s3Config.Region),
108 Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""),
109 }
110
111 if s.s3Config.Endpoint != "" {
112 config.Endpoint = aws.String(s.s3Config.Endpoint)
113 config.S3ForcePathStyle = aws.Bool(true)
114 }
115
116 sess, err := session.NewSession(config)
117 if err != nil {
118 s.logger.Error("error creating aws session", "error", err)
119 return helpers.ServerError(e, nil)
120 }
121
122 svc := s3.New(sess)
123
124 if _, err := svc.PutObject(&s3.PutObjectInput{
125 Bucket: aws.String(s.s3Config.Bucket),
126 Key: aws.String(fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String())),
127 Body: bytes.NewReader(fulldata.Bytes()),
128 }); err != nil {
129 s.logger.Error("error uploading blob to s3", "error", err)
130 return helpers.ServerError(e, nil)
131 }
132 }
133
134 if err := s.db.Exec("UPDATE blobs SET cid = ? WHERE id = ?", nil, c.Bytes(), blob.ID).Error; err != nil {
135 // there should probably be somme handling here if this fails...
136 s.logger.Error("error updating blob", "error", err)
137 return helpers.ServerError(e, nil)
138 }
139
140 resp := ComAtprotoRepoUploadBlobResponse{}
141 resp.Blob.Type = "blob"
142 resp.Blob.Ref.Link = c.String()
143 resp.Blob.MimeType = mime
144 resp.Blob.Size = read
145
146 return e.JSON(200, resp)
147}