1package server
2
3import (
4 "bytes"
5 "fmt"
6 "io"
7
8 "github.com/Azure/go-autorest/autorest/to"
9 "github.com/aws/aws-sdk-go/aws"
10 "github.com/aws/aws-sdk-go/aws/credentials"
11 "github.com/aws/aws-sdk-go/aws/session"
12 "github.com/aws/aws-sdk-go/service/s3"
13 "github.com/haileyok/cocoon/internal/helpers"
14 "github.com/haileyok/cocoon/models"
15 "github.com/ipfs/go-cid"
16 "github.com/labstack/echo/v4"
17)
18
19func (s *Server) handleSyncGetBlob(e echo.Context) error {
20 did := e.QueryParam("did")
21 if did == "" {
22 return helpers.InputError(e, nil)
23 }
24
25 cstr := e.QueryParam("cid")
26 if cstr == "" {
27 return helpers.InputError(e, nil)
28 }
29
30 c, err := cid.Parse(cstr)
31 if err != nil {
32 return helpers.InputError(e, nil)
33 }
34
35 urepo, err := s.getRepoActorByDid(did)
36 if err != nil {
37 s.logger.Error("could not find user for requested blob", "error", err)
38 return helpers.InputError(e, nil)
39 }
40
41 status := urepo.Status()
42 if status != nil {
43 if *status == "deactivated" {
44 return helpers.InputError(e, to.StringPtr("RepoDeactivated"))
45 }
46 }
47
48 var blob models.Blob
49 if err := s.db.Raw("SELECT * FROM blobs WHERE did = ? AND cid = ?", nil, did, c.Bytes()).Scan(&blob).Error; err != nil {
50 s.logger.Error("error looking up blob", "error", err)
51 return helpers.ServerError(e, nil)
52 }
53
54 buf := new(bytes.Buffer)
55
56 if blob.Storage == "sqlite" {
57 var parts []models.BlobPart
58 if err := s.db.Raw("SELECT * FROM blob_parts WHERE blob_id = ? ORDER BY idx", nil, blob.ID).Scan(&parts).Error; err != nil {
59 s.logger.Error("error getting blob parts", "error", err)
60 return helpers.ServerError(e, nil)
61 }
62
63 // TODO: we can just stream this, don't need to make a buffer
64 for _, p := range parts {
65 buf.Write(p.Data)
66 }
67 } else if blob.Storage == "s3" {
68 if !(s.s3Config != nil && s.s3Config.BlobstoreEnabled) {
69 s.logger.Error("s3 storage disabled")
70 return helpers.ServerError(e, nil)
71 }
72
73 config := &aws.Config{
74 Region: aws.String(s.s3Config.Region),
75 Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""),
76 }
77
78 if s.s3Config.Endpoint != "" {
79 config.Endpoint = aws.String(s.s3Config.Endpoint)
80 config.S3ForcePathStyle = aws.Bool(true)
81 }
82
83 sess, err := session.NewSession(config)
84 if err != nil {
85 s.logger.Error("error creating aws session", "error", err)
86 return helpers.ServerError(e, nil)
87 }
88
89 svc := s3.New(sess)
90 if result, err := svc.GetObject(&s3.GetObjectInput{
91 Bucket: aws.String(s.s3Config.Bucket),
92 Key: aws.String(fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String())),
93 }); err != nil {
94 s.logger.Error("error getting blob from s3", "error", err)
95 return helpers.ServerError(e, nil)
96 } else {
97 read := 0
98 part := 0
99 partBuf := make([]byte, 0x10000)
100
101 for {
102 n, err := io.ReadFull(result.Body, partBuf)
103 if err == io.ErrUnexpectedEOF || err == io.EOF {
104 if n == 0 {
105 break
106 }
107 } else if err != nil && err != io.ErrUnexpectedEOF {
108 s.logger.Error("error reading blob", "error", err)
109 return helpers.ServerError(e, nil)
110 }
111
112 data := partBuf[:n]
113 read += n
114 buf.Write(data)
115 part++
116 }
117 }
118 } else {
119 s.logger.Error("unknown storage", "storage", blob.Storage)
120 return helpers.ServerError(e, nil)
121 }
122
123 e.Response().Header().Set(echo.HeaderContentDisposition, "attachment; filename="+c.String())
124
125 return e.Stream(200, "application/octet-stream", buf)
126}