forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package repo
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "log"
9 "net/http"
10 "net/url"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview/db"
15 "tangled.org/core/appview/models"
16 "tangled.org/core/appview/pages"
17 "tangled.org/core/appview/reporesolver"
18 "tangled.org/core/appview/xrpcclient"
19 "tangled.org/core/tid"
20 "tangled.org/core/types"
21
22 comatproto "github.com/bluesky-social/indigo/api/atproto"
23 lexutil "github.com/bluesky-social/indigo/lex/util"
24 indigoxrpc "github.com/bluesky-social/indigo/xrpc"
25 "github.com/dustin/go-humanize"
26 "github.com/go-chi/chi/v5"
27 "github.com/go-git/go-git/v5/plumbing"
28 "github.com/ipfs/go-cid"
29)
30
31// TODO: proper statuses here on early exit
32func (rp *Repo) AttachArtifact(w http.ResponseWriter, r *http.Request) {
33 user := rp.oauth.GetUser(r)
34 tagParam := chi.URLParam(r, "tag")
35 f, err := rp.repoResolver.Resolve(r)
36 if err != nil {
37 log.Println("failed to get repo and knot", err)
38 rp.pages.Notice(w, "upload", "failed to upload artifact, error in repo resolution")
39 return
40 }
41
42 tag, err := rp.resolveTag(r.Context(), f, tagParam)
43 if err != nil {
44 log.Println("failed to resolve tag", err)
45 rp.pages.Notice(w, "upload", "failed to upload artifact, error in tag resolution")
46 return
47 }
48
49 file, handler, err := r.FormFile("artifact")
50 if err != nil {
51 log.Println("failed to upload artifact", err)
52 rp.pages.Notice(w, "upload", "failed to upload artifact")
53 return
54 }
55 defer file.Close()
56
57 client, err := rp.oauth.AuthorizedClient(r)
58 if err != nil {
59 log.Println("failed to get authorized client", err)
60 rp.pages.Notice(w, "upload", "failed to get authorized client")
61 return
62 }
63
64 uploadBlobResp, err := comatproto.RepoUploadBlob(r.Context(), client, file)
65 if err != nil {
66 log.Println("failed to upload blob", err)
67 rp.pages.Notice(w, "upload", "Failed to upload blob to your PDS. Try again later.")
68 return
69 }
70
71 log.Println("uploaded blob", humanize.Bytes(uint64(uploadBlobResp.Blob.Size)), uploadBlobResp.Blob.Ref.String())
72
73 rkey := tid.TID()
74 createdAt := time.Now()
75
76 putRecordResp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
77 Collection: tangled.RepoArtifactNSID,
78 Repo: user.Did,
79 Rkey: rkey,
80 Record: &lexutil.LexiconTypeDecoder{
81 Val: &tangled.RepoArtifact{
82 Artifact: uploadBlobResp.Blob,
83 CreatedAt: createdAt.Format(time.RFC3339),
84 Name: handler.Filename,
85 Repo: f.RepoAt().String(),
86 Tag: tag.Tag.Hash[:],
87 },
88 },
89 })
90 if err != nil {
91 log.Println("failed to create record", err)
92 rp.pages.Notice(w, "upload", "Failed to create artifact record. Try again later.")
93 return
94 }
95
96 log.Println(putRecordResp.Uri)
97
98 tx, err := rp.db.BeginTx(r.Context(), nil)
99 if err != nil {
100 log.Println("failed to start tx")
101 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.")
102 return
103 }
104 defer tx.Rollback()
105
106 artifact := models.Artifact{
107 Did: user.Did,
108 Rkey: rkey,
109 RepoAt: f.RepoAt(),
110 Tag: tag.Tag.Hash,
111 CreatedAt: createdAt,
112 BlobCid: cid.Cid(uploadBlobResp.Blob.Ref),
113 Name: handler.Filename,
114 Size: uint64(uploadBlobResp.Blob.Size),
115 MimeType: uploadBlobResp.Blob.MimeType,
116 }
117
118 err = db.AddArtifact(tx, artifact)
119 if err != nil {
120 log.Println("failed to add artifact record to db", err)
121 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.")
122 return
123 }
124
125 err = tx.Commit()
126 if err != nil {
127 log.Println("failed to add artifact record to db")
128 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.")
129 return
130 }
131
132 rp.pages.RepoArtifactFragment(w, pages.RepoArtifactParams{
133 LoggedInUser: user,
134 RepoInfo: f.RepoInfo(user),
135 Artifact: artifact,
136 })
137}
138
139func (rp *Repo) DownloadArtifact(w http.ResponseWriter, r *http.Request) {
140 f, err := rp.repoResolver.Resolve(r)
141 if err != nil {
142 log.Println("failed to get repo and knot", err)
143 http.Error(w, "failed to resolve repo", http.StatusInternalServerError)
144 return
145 }
146
147 tagParam := chi.URLParam(r, "tag")
148 filename := chi.URLParam(r, "file")
149
150 tag, err := rp.resolveTag(r.Context(), f, tagParam)
151 if err != nil {
152 log.Println("failed to resolve tag", err)
153 rp.pages.Notice(w, "upload", "failed to upload artifact, error in tag resolution")
154 return
155 }
156
157 artifacts, err := db.GetArtifact(
158 rp.db,
159 db.FilterEq("repo_at", f.RepoAt()),
160 db.FilterEq("tag", tag.Tag.Hash[:]),
161 db.FilterEq("name", filename),
162 )
163 if err != nil {
164 log.Println("failed to get artifacts", err)
165 http.Error(w, "failed to get artifact", http.StatusInternalServerError)
166 return
167 }
168
169 if len(artifacts) != 1 {
170 log.Printf("too many or too few artifacts found")
171 http.Error(w, "artifact not found", http.StatusNotFound)
172 return
173 }
174
175 artifact := artifacts[0]
176
177 ownerPds := f.OwnerId.PDSEndpoint()
178 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
179 q := url.Query()
180 q.Set("cid", artifact.BlobCid.String())
181 q.Set("did", artifact.Did)
182 url.RawQuery = q.Encode()
183
184 req, err := http.NewRequest(http.MethodGet, url.String(), nil)
185 if err != nil {
186 log.Println("failed to create request", err)
187 http.Error(w, "failed to create request", http.StatusInternalServerError)
188 return
189 }
190 req.Header.Set("Content-Type", "application/json")
191
192 resp, err := http.DefaultClient.Do(req)
193 if err != nil {
194 log.Println("failed to make request", err)
195 http.Error(w, "failed to make request to PDS", http.StatusInternalServerError)
196 return
197 }
198 defer resp.Body.Close()
199
200 // copy status code and relevant headers from upstream response
201 w.WriteHeader(resp.StatusCode)
202 for key, values := range resp.Header {
203 for _, v := range values {
204 w.Header().Add(key, v)
205 }
206 }
207
208 // stream the body directly to the client
209 if _, err := io.Copy(w, resp.Body); err != nil {
210 log.Println("error streaming response to client:", err)
211 }
212}
213
214// TODO: proper statuses here on early exit
215func (rp *Repo) DeleteArtifact(w http.ResponseWriter, r *http.Request) {
216 user := rp.oauth.GetUser(r)
217 tagParam := chi.URLParam(r, "tag")
218 filename := chi.URLParam(r, "file")
219 f, err := rp.repoResolver.Resolve(r)
220 if err != nil {
221 log.Println("failed to get repo and knot", err)
222 return
223 }
224
225 client, _ := rp.oauth.AuthorizedClient(r)
226
227 tag := plumbing.NewHash(tagParam)
228
229 artifacts, err := db.GetArtifact(
230 rp.db,
231 db.FilterEq("repo_at", f.RepoAt()),
232 db.FilterEq("tag", tag[:]),
233 db.FilterEq("name", filename),
234 )
235 if err != nil {
236 log.Println("failed to get artifacts", err)
237 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.")
238 return
239 }
240 if len(artifacts) != 1 {
241 rp.pages.Notice(w, "remove", "Unable to find artifact.")
242 return
243 }
244
245 artifact := artifacts[0]
246
247 if user.Did != artifact.Did {
248 log.Println("user not authorized to delete artifact", err)
249 rp.pages.Notice(w, "remove", "Unauthorized deletion of artifact.")
250 return
251 }
252
253 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{
254 Collection: tangled.RepoArtifactNSID,
255 Repo: user.Did,
256 Rkey: artifact.Rkey,
257 })
258 if err != nil {
259 log.Println("failed to get blob from pds", err)
260 rp.pages.Notice(w, "remove", "Failed to remove blob from PDS.")
261 return
262 }
263
264 tx, err := rp.db.BeginTx(r.Context(), nil)
265 if err != nil {
266 log.Println("failed to start tx")
267 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.")
268 return
269 }
270 defer tx.Rollback()
271
272 err = db.DeleteArtifact(tx,
273 db.FilterEq("repo_at", f.RepoAt()),
274 db.FilterEq("tag", artifact.Tag[:]),
275 db.FilterEq("name", filename),
276 )
277 if err != nil {
278 log.Println("failed to remove artifact record from db", err)
279 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.")
280 return
281 }
282
283 err = tx.Commit()
284 if err != nil {
285 log.Println("failed to remove artifact record from db")
286 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.")
287 return
288 }
289
290 w.Write([]byte{})
291}
292
293func (rp *Repo) resolveTag(ctx context.Context, f *reporesolver.ResolvedRepo, tagParam string) (*types.TagReference, error) {
294 tagParam, err := url.QueryUnescape(tagParam)
295 if err != nil {
296 return nil, err
297 }
298
299 scheme := "http"
300 if !rp.config.Core.Dev {
301 scheme = "https"
302 }
303 host := fmt.Sprintf("%s://%s", scheme, f.Knot)
304 xrpcc := &indigoxrpc.Client{
305 Host: host,
306 }
307
308 repo := fmt.Sprintf("%s/%s", f.OwnerDid(), f.Name)
309 xrpcBytes, err := tangled.RepoTags(ctx, xrpcc, "", 0, repo)
310 if err != nil {
311 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
312 log.Println("failed to call XRPC repo.tags", xrpcerr)
313 return nil, xrpcerr
314 }
315 log.Println("failed to reach knotserver", err)
316 return nil, err
317 }
318
319 var result types.RepoTagsResponse
320 if err := json.Unmarshal(xrpcBytes, &result); err != nil {
321 log.Println("failed to decode XRPC tags response", err)
322 return nil, err
323 }
324
325 var tag *types.TagReference
326 for _, t := range result.Tags {
327 if t.Tag != nil {
328 if t.Reference.Name == tagParam || t.Reference.Hash == tagParam {
329 tag = t
330 }
331 }
332 }
333
334 if tag == nil {
335 return nil, fmt.Errorf("invalid tag, only annotated tags are supported for artifacts")
336 }
337
338 if tag.Tag.Target.IsZero() {
339 return nil, fmt.Errorf("invalid tag, only annotated tags are supported for artifacts")
340 }
341
342 return tag, nil
343}