forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package repo
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "log"
9 "net/http"
10 "net/url"
11 "time"
12
13 comatproto "github.com/bluesky-social/indigo/api/atproto"
14 lexutil "github.com/bluesky-social/indigo/lex/util"
15 indigoxrpc "github.com/bluesky-social/indigo/xrpc"
16 "github.com/dustin/go-humanize"
17 "github.com/go-chi/chi/v5"
18 "github.com/go-git/go-git/v5/plumbing"
19 "github.com/ipfs/go-cid"
20 "tangled.sh/tangled.sh/core/api/tangled"
21 "tangled.sh/tangled.sh/core/appview/db"
22 "tangled.sh/tangled.sh/core/appview/pages"
23 "tangled.sh/tangled.sh/core/appview/reporesolver"
24 "tangled.sh/tangled.sh/core/appview/xrpcclient"
25 "tangled.sh/tangled.sh/core/tid"
26 "tangled.sh/tangled.sh/core/types"
27)
28
29// TODO: proper statuses here on early exit
30func (rp *Repo) AttachArtifact(w http.ResponseWriter, r *http.Request) {
31 user := rp.oauth.GetUser(r)
32 tagParam := chi.URLParam(r, "tag")
33 f, err := rp.repoResolver.Resolve(r)
34 if err != nil {
35 log.Println("failed to get repo and knot", err)
36 rp.pages.Notice(w, "upload", "failed to upload artifact, error in repo resolution")
37 return
38 }
39
40 tag, err := rp.resolveTag(r.Context(), f, tagParam)
41 if err != nil {
42 log.Println("failed to resolve tag", err)
43 rp.pages.Notice(w, "upload", "failed to upload artifact, error in tag resolution")
44 return
45 }
46
47 file, handler, err := r.FormFile("artifact")
48 if err != nil {
49 log.Println("failed to upload artifact", err)
50 rp.pages.Notice(w, "upload", "failed to upload artifact")
51 return
52 }
53 defer file.Close()
54
55 client, err := rp.oauth.AuthorizedClient(r)
56 if err != nil {
57 log.Println("failed to get authorized client", err)
58 rp.pages.Notice(w, "upload", "failed to get authorized client")
59 return
60 }
61
62 uploadBlobResp, err := client.RepoUploadBlob(r.Context(), file)
63 if err != nil {
64 log.Println("failed to upload blob", err)
65 rp.pages.Notice(w, "upload", "Failed to upload blob to your PDS. Try again later.")
66 return
67 }
68
69 log.Println("uploaded blob", humanize.Bytes(uint64(uploadBlobResp.Blob.Size)), uploadBlobResp.Blob.Ref.String())
70
71 rkey := tid.TID()
72 createdAt := time.Now()
73
74 putRecordResp, err := client.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{
75 Collection: tangled.RepoArtifactNSID,
76 Repo: user.Did,
77 Rkey: rkey,
78 Record: &lexutil.LexiconTypeDecoder{
79 Val: &tangled.RepoArtifact{
80 Artifact: uploadBlobResp.Blob,
81 CreatedAt: createdAt.Format(time.RFC3339),
82 Name: handler.Filename,
83 Repo: f.RepoAt().String(),
84 Tag: tag.Tag.Hash[:],
85 },
86 },
87 })
88 if err != nil {
89 log.Println("failed to create record", err)
90 rp.pages.Notice(w, "upload", "Failed to create artifact record. Try again later.")
91 return
92 }
93
94 log.Println(putRecordResp.Uri)
95
96 tx, err := rp.db.BeginTx(r.Context(), nil)
97 if err != nil {
98 log.Println("failed to start tx")
99 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.")
100 return
101 }
102 defer tx.Rollback()
103
104 artifact := db.Artifact{
105 Did: user.Did,
106 Rkey: rkey,
107 RepoAt: f.RepoAt(),
108 Tag: tag.Tag.Hash,
109 CreatedAt: createdAt,
110 BlobCid: cid.Cid(uploadBlobResp.Blob.Ref),
111 Name: handler.Filename,
112 Size: uint64(uploadBlobResp.Blob.Size),
113 MimeType: uploadBlobResp.Blob.MimeType,
114 }
115
116 err = db.AddArtifact(tx, artifact)
117 if err != nil {
118 log.Println("failed to add artifact record to db", err)
119 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.")
120 return
121 }
122
123 err = tx.Commit()
124 if err != nil {
125 log.Println("failed to add artifact record to db")
126 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.")
127 return
128 }
129
130 rp.pages.RepoArtifactFragment(w, pages.RepoArtifactParams{
131 LoggedInUser: user,
132 RepoInfo: f.RepoInfo(user),
133 Artifact: artifact,
134 })
135}
136
137func (rp *Repo) DownloadArtifact(w http.ResponseWriter, r *http.Request) {
138 f, err := rp.repoResolver.Resolve(r)
139 if err != nil {
140 log.Println("failed to get repo and knot", err)
141 http.Error(w, "failed to resolve repo", http.StatusInternalServerError)
142 return
143 }
144
145 tagParam := chi.URLParam(r, "tag")
146 filename := chi.URLParam(r, "file")
147
148 tag, err := rp.resolveTag(r.Context(), f, tagParam)
149 if err != nil {
150 log.Println("failed to resolve tag", err)
151 rp.pages.Notice(w, "upload", "failed to upload artifact, error in tag resolution")
152 return
153 }
154
155 artifacts, err := db.GetArtifact(
156 rp.db,
157 db.FilterEq("repo_at", f.RepoAt()),
158 db.FilterEq("tag", tag.Tag.Hash[:]),
159 db.FilterEq("name", filename),
160 )
161 if err != nil {
162 log.Println("failed to get artifacts", err)
163 http.Error(w, "failed to get artifact", http.StatusInternalServerError)
164 return
165 }
166
167 if len(artifacts) != 1 {
168 log.Printf("too many or too few artifacts found")
169 http.Error(w, "artifact not found", http.StatusNotFound)
170 return
171 }
172
173 artifact := artifacts[0]
174
175 ownerPds := f.OwnerId.PDSEndpoint()
176 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
177 q := url.Query()
178 q.Set("cid", artifact.BlobCid.String())
179 q.Set("did", artifact.Did)
180 url.RawQuery = q.Encode()
181
182 req, err := http.NewRequest(http.MethodGet, url.String(), nil)
183 if err != nil {
184 log.Println("failed to create request", err)
185 http.Error(w, "failed to create request", http.StatusInternalServerError)
186 return
187 }
188 req.Header.Set("Content-Type", "application/json")
189
190 resp, err := http.DefaultClient.Do(req)
191 if err != nil {
192 log.Println("failed to make request", err)
193 http.Error(w, "failed to make request to PDS", http.StatusInternalServerError)
194 return
195 }
196 defer resp.Body.Close()
197
198 // copy status code and relevant headers from upstream response
199 w.WriteHeader(resp.StatusCode)
200 for key, values := range resp.Header {
201 for _, v := range values {
202 w.Header().Add(key, v)
203 }
204 }
205
206 // stream the body directly to the client
207 if _, err := io.Copy(w, resp.Body); err != nil {
208 log.Println("error streaming response to client:", err)
209 }
210}
211
212// TODO: proper statuses here on early exit
213func (rp *Repo) DeleteArtifact(w http.ResponseWriter, r *http.Request) {
214 user := rp.oauth.GetUser(r)
215 tagParam := chi.URLParam(r, "tag")
216 filename := chi.URLParam(r, "file")
217 f, err := rp.repoResolver.Resolve(r)
218 if err != nil {
219 log.Println("failed to get repo and knot", err)
220 return
221 }
222
223 client, _ := rp.oauth.AuthorizedClient(r)
224
225 tag := plumbing.NewHash(tagParam)
226
227 artifacts, err := db.GetArtifact(
228 rp.db,
229 db.FilterEq("repo_at", f.RepoAt()),
230 db.FilterEq("tag", tag[:]),
231 db.FilterEq("name", filename),
232 )
233 if err != nil {
234 log.Println("failed to get artifacts", err)
235 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.")
236 return
237 }
238 if len(artifacts) != 1 {
239 rp.pages.Notice(w, "remove", "Unable to find artifact.")
240 return
241 }
242
243 artifact := artifacts[0]
244
245 if user.Did != artifact.Did {
246 log.Println("user not authorized to delete artifact", err)
247 rp.pages.Notice(w, "remove", "Unauthorized deletion of artifact.")
248 return
249 }
250
251 _, err = client.RepoDeleteRecord(r.Context(), &comatproto.RepoDeleteRecord_Input{
252 Collection: tangled.RepoArtifactNSID,
253 Repo: user.Did,
254 Rkey: artifact.Rkey,
255 })
256 if err != nil {
257 log.Println("failed to get blob from pds", err)
258 rp.pages.Notice(w, "remove", "Failed to remove blob from PDS.")
259 return
260 }
261
262 tx, err := rp.db.BeginTx(r.Context(), nil)
263 if err != nil {
264 log.Println("failed to start tx")
265 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.")
266 return
267 }
268 defer tx.Rollback()
269
270 err = db.DeleteArtifact(tx,
271 db.FilterEq("repo_at", f.RepoAt()),
272 db.FilterEq("tag", artifact.Tag[:]),
273 db.FilterEq("name", filename),
274 )
275 if err != nil {
276 log.Println("failed to remove artifact record from db", err)
277 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.")
278 return
279 }
280
281 err = tx.Commit()
282 if err != nil {
283 log.Println("failed to remove artifact record from db")
284 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.")
285 return
286 }
287
288 w.Write([]byte{})
289}
290
291func (rp *Repo) resolveTag(ctx context.Context, f *reporesolver.ResolvedRepo, tagParam string) (*types.TagReference, error) {
292 tagParam, err := url.QueryUnescape(tagParam)
293 if err != nil {
294 return nil, err
295 }
296
297 scheme := "http"
298 if !rp.config.Core.Dev {
299 scheme = "https"
300 }
301 host := fmt.Sprintf("%s://%s", scheme, f.Knot)
302 xrpcc := &indigoxrpc.Client{
303 Host: host,
304 }
305
306 repo := fmt.Sprintf("%s/%s", f.OwnerDid(), f.Name)
307 xrpcBytes, err := tangled.RepoTags(ctx, xrpcc, "", 0, repo)
308 if err != nil {
309 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
310 log.Println("failed to call XRPC repo.tags", xrpcerr)
311 return nil, xrpcerr
312 }
313 log.Println("failed to reach knotserver", err)
314 return nil, err
315 }
316
317 var result types.RepoTagsResponse
318 if err := json.Unmarshal(xrpcBytes, &result); err != nil {
319 log.Println("failed to decode XRPC tags response", err)
320 return nil, err
321 }
322
323 var tag *types.TagReference
324 for _, t := range result.Tags {
325 if t.Tag != nil {
326 if t.Reference.Name == tagParam || t.Reference.Hash == tagParam {
327 tag = t
328 }
329 }
330 }
331
332 if tag == nil {
333 return nil, fmt.Errorf("invalid tag, only annotated tags are supported for artifacts")
334 }
335
336 if tag.Tag.Target.IsZero() {
337 return nil, fmt.Errorf("invalid tag, only annotated tags are supported for artifacts")
338 }
339
340 return tag, nil
341}