1package server
2
3import (
4 "bytes"
5 "fmt"
6 "io"
7
8 "github.com/Azure/go-autorest/autorest/to"
9 "github.com/aws/aws-sdk-go/aws"
10 "github.com/aws/aws-sdk-go/aws/credentials"
11 "github.com/aws/aws-sdk-go/aws/session"
12 "github.com/aws/aws-sdk-go/service/s3"
13 "github.com/haileyok/cocoon/internal/helpers"
14 "github.com/haileyok/cocoon/models"
15 "github.com/ipfs/go-cid"
16 "github.com/labstack/echo/v4"
17)
18
19func (s *Server) handleSyncGetBlob(e echo.Context) error {
20 did := e.QueryParam("did")
21 if did == "" {
22 return helpers.InputError(e, nil)
23 }
24
25 cstr := e.QueryParam("cid")
26 if cstr == "" {
27 return helpers.InputError(e, nil)
28 }
29
30 c, err := cid.Parse(cstr)
31 if err != nil {
32 return helpers.InputError(e, nil)
33 }
34
35 urepo, err := s.getRepoActorByDid(did)
36 if err != nil {
37 s.logger.Error("could not find user for requested blob", "error", err)
38 return helpers.InputError(e, nil)
39 }
40
41 status := urepo.Status()
42 if status != nil {
43 if *status == "deactivated" {
44 return helpers.InputError(e, to.StringPtr("RepoDeactivated"))
45 }
46 }
47
48 var blob models.Blob
49 if err := s.db.Raw("SELECT * FROM blobs WHERE did = ? AND cid = ?", nil, did, c.Bytes()).Scan(&blob).Error; err != nil {
50 s.logger.Error("error looking up blob", "error", err)
51 return helpers.ServerError(e, nil)
52 }
53
54 buf := new(bytes.Buffer)
55
56 if blob.Storage == "sqlite" {
57 var parts []models.BlobPart
58 if err := s.db.Raw("SELECT * FROM blob_parts WHERE blob_id = ? ORDER BY idx", nil, blob.ID).Scan(&parts).Error; err != nil {
59 s.logger.Error("error getting blob parts", "error", err)
60 return helpers.ServerError(e, nil)
61 }
62
63 // TODO: we can just stream this, don't need to make a buffer
64 for _, p := range parts {
65 buf.Write(p.Data)
66 }
67 } else if blob.Storage == "s3" {
68 if !(s.s3Config != nil && s.s3Config.BlobstoreEnabled) {
69 s.logger.Error("s3 storage disabled")
70 return helpers.ServerError(e, nil)
71 }
72
73 blobKey := fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String())
74
75 if s.s3Config.CDNUrl != "" {
76 redirectUrl := fmt.Sprintf("%s/%s", s.s3Config.CDNUrl, blobKey)
77 return e.Redirect(302, redirectUrl)
78 }
79
80 config := &aws.Config{
81 Region: aws.String(s.s3Config.Region),
82 Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""),
83 }
84
85 if s.s3Config.Endpoint != "" {
86 config.Endpoint = aws.String(s.s3Config.Endpoint)
87 config.S3ForcePathStyle = aws.Bool(true)
88 }
89
90 sess, err := session.NewSession(config)
91 if err != nil {
92 s.logger.Error("error creating aws session", "error", err)
93 return helpers.ServerError(e, nil)
94 }
95
96 svc := s3.New(sess)
97 if result, err := svc.GetObject(&s3.GetObjectInput{
98 Bucket: aws.String(s.s3Config.Bucket),
99 Key: aws.String(blobKey),
100 }); err != nil {
101 s.logger.Error("error getting blob from s3", "error", err)
102 return helpers.ServerError(e, nil)
103 } else {
104 read := 0
105 part := 0
106 partBuf := make([]byte, 0x10000)
107
108 for {
109 n, err := io.ReadFull(result.Body, partBuf)
110 if err == io.ErrUnexpectedEOF || err == io.EOF {
111 if n == 0 {
112 break
113 }
114 } else if err != nil && err != io.ErrUnexpectedEOF {
115 s.logger.Error("error reading blob", "error", err)
116 return helpers.ServerError(e, nil)
117 }
118
119 data := partBuf[:n]
120 read += n
121 buf.Write(data)
122 part++
123 }
124 }
125 } else {
126 s.logger.Error("unknown storage", "storage", blob.Storage)
127 return helpers.ServerError(e, nil)
128 }
129
130 e.Response().Header().Set(echo.HeaderContentDisposition, "attachment; filename="+c.String())
131
132 return e.Stream(200, "application/octet-stream", buf)
133}