forked from tangled.org/core
this repo has no description
at master 9.3 kB view raw
1package repo 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "log" 9 "net/http" 10 "net/url" 11 "time" 12 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 lexutil "github.com/bluesky-social/indigo/lex/util" 15 indigoxrpc "github.com/bluesky-social/indigo/xrpc" 16 "github.com/dustin/go-humanize" 17 "github.com/go-chi/chi/v5" 18 "github.com/go-git/go-git/v5/plumbing" 19 "github.com/ipfs/go-cid" 20 "tangled.org/core/api/tangled" 21 "tangled.org/core/appview/db" 22 "tangled.org/core/appview/models" 23 "tangled.org/core/appview/pages" 24 "tangled.org/core/appview/reporesolver" 25 "tangled.org/core/appview/xrpcclient" 26 "tangled.org/core/tid" 27 "tangled.org/core/types" 28) 29 30// TODO: proper statuses here on early exit 31func (rp *Repo) AttachArtifact(w http.ResponseWriter, r *http.Request) { 32 user := rp.oauth.GetUser(r) 33 tagParam := chi.URLParam(r, "tag") 34 f, err := rp.repoResolver.Resolve(r) 35 if err != nil { 36 log.Println("failed to get repo and knot", err) 37 rp.pages.Notice(w, "upload", "failed to upload artifact, error in repo resolution") 38 return 39 } 40 41 tag, err := rp.resolveTag(r.Context(), f, tagParam) 42 if err != nil { 43 log.Println("failed to resolve tag", err) 44 rp.pages.Notice(w, "upload", "failed to upload artifact, error in tag resolution") 45 return 46 } 47 48 file, handler, err := r.FormFile("artifact") 49 if err != nil { 50 log.Println("failed to upload artifact", err) 51 rp.pages.Notice(w, "upload", "failed to upload artifact") 52 return 53 } 54 defer file.Close() 55 56 client, err := rp.oauth.AuthorizedClient(r) 57 if err != nil { 58 log.Println("failed to get authorized client", err) 59 rp.pages.Notice(w, "upload", "failed to get authorized client") 60 return 61 } 62 63 uploadBlobResp, err := client.RepoUploadBlob(r.Context(), file) 64 if err != nil { 65 log.Println("failed to upload blob", err) 66 rp.pages.Notice(w, "upload", "Failed to upload blob to your PDS. Try again later.") 67 return 68 } 69 70 log.Println("uploaded blob", humanize.Bytes(uint64(uploadBlobResp.Blob.Size)), uploadBlobResp.Blob.Ref.String()) 71 72 rkey := tid.TID() 73 createdAt := time.Now() 74 75 putRecordResp, err := client.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{ 76 Collection: tangled.RepoArtifactNSID, 77 Repo: user.Did, 78 Rkey: rkey, 79 Record: &lexutil.LexiconTypeDecoder{ 80 Val: &tangled.RepoArtifact{ 81 Artifact: uploadBlobResp.Blob, 82 CreatedAt: createdAt.Format(time.RFC3339), 83 Name: handler.Filename, 84 Repo: f.RepoAt().String(), 85 Tag: tag.Tag.Hash[:], 86 }, 87 }, 88 }) 89 if err != nil { 90 log.Println("failed to create record", err) 91 rp.pages.Notice(w, "upload", "Failed to create artifact record. Try again later.") 92 return 93 } 94 95 log.Println(putRecordResp.Uri) 96 97 tx, err := rp.db.BeginTx(r.Context(), nil) 98 if err != nil { 99 log.Println("failed to start tx") 100 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.") 101 return 102 } 103 defer tx.Rollback() 104 105 artifact := models.Artifact{ 106 Did: user.Did, 107 Rkey: rkey, 108 RepoAt: f.RepoAt(), 109 Tag: tag.Tag.Hash, 110 CreatedAt: createdAt, 111 BlobCid: cid.Cid(uploadBlobResp.Blob.Ref), 112 Name: handler.Filename, 113 Size: uint64(uploadBlobResp.Blob.Size), 114 MimeType: uploadBlobResp.Blob.MimeType, 115 } 116 117 err = db.AddArtifact(tx, artifact) 118 if err != nil { 119 log.Println("failed to add artifact record to db", err) 120 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.") 121 return 122 } 123 124 err = tx.Commit() 125 if err != nil { 126 log.Println("failed to add artifact record to db") 127 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.") 128 return 129 } 130 131 rp.pages.RepoArtifactFragment(w, pages.RepoArtifactParams{ 132 LoggedInUser: user, 133 RepoInfo: f.RepoInfo(user), 134 Artifact: artifact, 135 }) 136} 137 138func (rp *Repo) DownloadArtifact(w http.ResponseWriter, r *http.Request) { 139 f, err := rp.repoResolver.Resolve(r) 140 if err != nil { 141 log.Println("failed to get repo and knot", err) 142 http.Error(w, "failed to resolve repo", http.StatusInternalServerError) 143 return 144 } 145 146 tagParam := chi.URLParam(r, "tag") 147 filename := chi.URLParam(r, "file") 148 149 tag, err := rp.resolveTag(r.Context(), f, tagParam) 150 if err != nil { 151 log.Println("failed to resolve tag", err) 152 rp.pages.Notice(w, "upload", "failed to upload artifact, error in tag resolution") 153 return 154 } 155 156 artifacts, err := db.GetArtifact( 157 rp.db, 158 db.FilterEq("repo_at", f.RepoAt()), 159 db.FilterEq("tag", tag.Tag.Hash[:]), 160 db.FilterEq("name", filename), 161 ) 162 if err != nil { 163 log.Println("failed to get artifacts", err) 164 http.Error(w, "failed to get artifact", http.StatusInternalServerError) 165 return 166 } 167 168 if len(artifacts) != 1 { 169 log.Printf("too many or too few artifacts found") 170 http.Error(w, "artifact not found", http.StatusNotFound) 171 return 172 } 173 174 artifact := artifacts[0] 175 176 ownerPds := f.OwnerId.PDSEndpoint() 177 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 178 q := url.Query() 179 q.Set("cid", artifact.BlobCid.String()) 180 q.Set("did", artifact.Did) 181 url.RawQuery = q.Encode() 182 183 req, err := http.NewRequest(http.MethodGet, url.String(), nil) 184 if err != nil { 185 log.Println("failed to create request", err) 186 http.Error(w, "failed to create request", http.StatusInternalServerError) 187 return 188 } 189 req.Header.Set("Content-Type", "application/json") 190 191 resp, err := http.DefaultClient.Do(req) 192 if err != nil { 193 log.Println("failed to make request", err) 194 http.Error(w, "failed to make request to PDS", http.StatusInternalServerError) 195 return 196 } 197 defer resp.Body.Close() 198 199 // copy status code and relevant headers from upstream response 200 w.WriteHeader(resp.StatusCode) 201 for key, values := range resp.Header { 202 for _, v := range values { 203 w.Header().Add(key, v) 204 } 205 } 206 207 // stream the body directly to the client 208 if _, err := io.Copy(w, resp.Body); err != nil { 209 log.Println("error streaming response to client:", err) 210 } 211} 212 213// TODO: proper statuses here on early exit 214func (rp *Repo) DeleteArtifact(w http.ResponseWriter, r *http.Request) { 215 user := rp.oauth.GetUser(r) 216 tagParam := chi.URLParam(r, "tag") 217 filename := chi.URLParam(r, "file") 218 f, err := rp.repoResolver.Resolve(r) 219 if err != nil { 220 log.Println("failed to get repo and knot", err) 221 return 222 } 223 224 client, _ := rp.oauth.AuthorizedClient(r) 225 226 tag := plumbing.NewHash(tagParam) 227 228 artifacts, err := db.GetArtifact( 229 rp.db, 230 db.FilterEq("repo_at", f.RepoAt()), 231 db.FilterEq("tag", tag[:]), 232 db.FilterEq("name", filename), 233 ) 234 if err != nil { 235 log.Println("failed to get artifacts", err) 236 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 237 return 238 } 239 if len(artifacts) != 1 { 240 rp.pages.Notice(w, "remove", "Unable to find artifact.") 241 return 242 } 243 244 artifact := artifacts[0] 245 246 if user.Did != artifact.Did { 247 log.Println("user not authorized to delete artifact", err) 248 rp.pages.Notice(w, "remove", "Unauthorized deletion of artifact.") 249 return 250 } 251 252 _, err = client.RepoDeleteRecord(r.Context(), &comatproto.RepoDeleteRecord_Input{ 253 Collection: tangled.RepoArtifactNSID, 254 Repo: user.Did, 255 Rkey: artifact.Rkey, 256 }) 257 if err != nil { 258 log.Println("failed to get blob from pds", err) 259 rp.pages.Notice(w, "remove", "Failed to remove blob from PDS.") 260 return 261 } 262 263 tx, err := rp.db.BeginTx(r.Context(), nil) 264 if err != nil { 265 log.Println("failed to start tx") 266 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 267 return 268 } 269 defer tx.Rollback() 270 271 err = db.DeleteArtifact(tx, 272 db.FilterEq("repo_at", f.RepoAt()), 273 db.FilterEq("tag", artifact.Tag[:]), 274 db.FilterEq("name", filename), 275 ) 276 if err != nil { 277 log.Println("failed to remove artifact record from db", err) 278 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 279 return 280 } 281 282 err = tx.Commit() 283 if err != nil { 284 log.Println("failed to remove artifact record from db") 285 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 286 return 287 } 288 289 w.Write([]byte{}) 290} 291 292func (rp *Repo) resolveTag(ctx context.Context, f *reporesolver.ResolvedRepo, tagParam string) (*types.TagReference, error) { 293 tagParam, err := url.QueryUnescape(tagParam) 294 if err != nil { 295 return nil, err 296 } 297 298 scheme := "http" 299 if !rp.config.Core.Dev { 300 scheme = "https" 301 } 302 host := fmt.Sprintf("%s://%s", scheme, f.Knot) 303 xrpcc := &indigoxrpc.Client{ 304 Host: host, 305 } 306 307 repo := fmt.Sprintf("%s/%s", f.OwnerDid(), f.Name) 308 xrpcBytes, err := tangled.RepoTags(ctx, xrpcc, "", 0, repo) 309 if err != nil { 310 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 311 log.Println("failed to call XRPC repo.tags", xrpcerr) 312 return nil, xrpcerr 313 } 314 log.Println("failed to reach knotserver", err) 315 return nil, err 316 } 317 318 var result types.RepoTagsResponse 319 if err := json.Unmarshal(xrpcBytes, &result); err != nil { 320 log.Println("failed to decode XRPC tags response", err) 321 return nil, err 322 } 323 324 var tag *types.TagReference 325 for _, t := range result.Tags { 326 if t.Tag != nil { 327 if t.Reference.Name == tagParam || t.Reference.Hash == tagParam { 328 tag = t 329 } 330 } 331 } 332 333 if tag == nil { 334 return nil, fmt.Errorf("invalid tag, only annotated tags are supported for artifacts") 335 } 336 337 if tag.Tag.Target.IsZero() { 338 return nil, fmt.Errorf("invalid tag, only annotated tags are supported for artifacts") 339 } 340 341 return tag, nil 342}