forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package repo 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "log" 9 "net/http" 10 "net/url" 11 "time" 12 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 lexutil "github.com/bluesky-social/indigo/lex/util" 15 indigoxrpc "github.com/bluesky-social/indigo/xrpc" 16 "github.com/dustin/go-humanize" 17 "github.com/go-chi/chi/v5" 18 "github.com/go-git/go-git/v5/plumbing" 19 "github.com/ipfs/go-cid" 20 "tangled.sh/tangled.sh/core/api/tangled" 21 "tangled.sh/tangled.sh/core/appview/db" 22 "tangled.sh/tangled.sh/core/appview/pages" 23 "tangled.sh/tangled.sh/core/appview/reporesolver" 24 "tangled.sh/tangled.sh/core/appview/xrpcclient" 25 "tangled.sh/tangled.sh/core/tid" 26 "tangled.sh/tangled.sh/core/types" 27) 28 29// TODO: proper statuses here on early exit 30func (rp *Repo) AttachArtifact(w http.ResponseWriter, r *http.Request) { 31 user := rp.oauth.GetUser(r) 32 tagParam := chi.URLParam(r, "tag") 33 f, err := rp.repoResolver.Resolve(r) 34 if err != nil { 35 log.Println("failed to get repo and knot", err) 36 rp.pages.Notice(w, "upload", "failed to upload artifact, error in repo resolution") 37 return 38 } 39 40 tag, err := rp.resolveTag(r.Context(), f, tagParam) 41 if err != nil { 42 log.Println("failed to resolve tag", err) 43 rp.pages.Notice(w, "upload", "failed to upload artifact, error in tag resolution") 44 return 45 } 46 47 file, handler, err := r.FormFile("artifact") 48 if err != nil { 49 log.Println("failed to upload artifact", err) 50 rp.pages.Notice(w, "upload", "failed to upload artifact") 51 return 52 } 53 defer file.Close() 54 55 client, err := rp.oauth.AuthorizedClient(r) 56 if err != nil { 57 log.Println("failed to get authorized client", err) 58 rp.pages.Notice(w, "upload", "failed to get authorized client") 59 return 60 } 61 62 uploadBlobResp, err := client.RepoUploadBlob(r.Context(), file) 63 if err != nil { 64 log.Println("failed to upload blob", err) 65 rp.pages.Notice(w, "upload", "Failed to upload blob to your PDS. Try again later.") 66 return 67 } 68 69 log.Println("uploaded blob", humanize.Bytes(uint64(uploadBlobResp.Blob.Size)), uploadBlobResp.Blob.Ref.String()) 70 71 rkey := tid.TID() 72 createdAt := time.Now() 73 74 putRecordResp, err := client.RepoPutRecord(r.Context(), &comatproto.RepoPutRecord_Input{ 75 Collection: tangled.RepoArtifactNSID, 76 Repo: user.Did, 77 Rkey: rkey, 78 Record: &lexutil.LexiconTypeDecoder{ 79 Val: &tangled.RepoArtifact{ 80 Artifact: uploadBlobResp.Blob, 81 CreatedAt: createdAt.Format(time.RFC3339), 82 Name: handler.Filename, 83 Repo: f.RepoAt().String(), 84 Tag: tag.Tag.Hash[:], 85 }, 86 }, 87 }) 88 if err != nil { 89 log.Println("failed to create record", err) 90 rp.pages.Notice(w, "upload", "Failed to create artifact record. Try again later.") 91 return 92 } 93 94 log.Println(putRecordResp.Uri) 95 96 tx, err := rp.db.BeginTx(r.Context(), nil) 97 if err != nil { 98 log.Println("failed to start tx") 99 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.") 100 return 101 } 102 defer tx.Rollback() 103 104 artifact := db.Artifact{ 105 Did: user.Did, 106 Rkey: rkey, 107 RepoAt: f.RepoAt(), 108 Tag: tag.Tag.Hash, 109 CreatedAt: createdAt, 110 BlobCid: cid.Cid(uploadBlobResp.Blob.Ref), 111 Name: handler.Filename, 112 Size: uint64(uploadBlobResp.Blob.Size), 113 MimeType: uploadBlobResp.Blob.MimeType, 114 } 115 116 err = db.AddArtifact(tx, artifact) 117 if err != nil { 118 log.Println("failed to add artifact record to db", err) 119 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.") 120 return 121 } 122 123 err = tx.Commit() 124 if err != nil { 125 log.Println("failed to add artifact record to db") 126 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.") 127 return 128 } 129 130 rp.pages.RepoArtifactFragment(w, pages.RepoArtifactParams{ 131 LoggedInUser: user, 132 RepoInfo: f.RepoInfo(user), 133 Artifact: artifact, 134 }) 135} 136 137func (rp *Repo) DownloadArtifact(w http.ResponseWriter, r *http.Request) { 138 f, err := rp.repoResolver.Resolve(r) 139 if err != nil { 140 log.Println("failed to get repo and knot", err) 141 http.Error(w, "failed to resolve repo", http.StatusInternalServerError) 142 return 143 } 144 145 tagParam := chi.URLParam(r, "tag") 146 filename := chi.URLParam(r, "file") 147 148 tag, err := rp.resolveTag(r.Context(), f, tagParam) 149 if err != nil { 150 log.Println("failed to resolve tag", err) 151 rp.pages.Notice(w, "upload", "failed to upload artifact, error in tag resolution") 152 return 153 } 154 155 artifacts, err := db.GetArtifact( 156 rp.db, 157 db.FilterEq("repo_at", f.RepoAt()), 158 db.FilterEq("tag", tag.Tag.Hash[:]), 159 db.FilterEq("name", filename), 160 ) 161 if err != nil { 162 log.Println("failed to get artifacts", err) 163 http.Error(w, "failed to get artifact", http.StatusInternalServerError) 164 return 165 } 166 167 if len(artifacts) != 1 { 168 log.Printf("too many or too few artifacts found") 169 http.Error(w, "artifact not found", http.StatusNotFound) 170 return 171 } 172 173 artifact := artifacts[0] 174 175 ownerPds := f.OwnerId.PDSEndpoint() 176 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 177 q := url.Query() 178 q.Set("cid", artifact.BlobCid.String()) 179 q.Set("did", artifact.Did) 180 url.RawQuery = q.Encode() 181 182 req, err := http.NewRequest(http.MethodGet, url.String(), nil) 183 if err != nil { 184 log.Println("failed to create request", err) 185 http.Error(w, "failed to create request", http.StatusInternalServerError) 186 return 187 } 188 req.Header.Set("Content-Type", "application/json") 189 190 resp, err := http.DefaultClient.Do(req) 191 if err != nil { 192 log.Println("failed to make request", err) 193 http.Error(w, "failed to make request to PDS", http.StatusInternalServerError) 194 return 195 } 196 defer resp.Body.Close() 197 198 // copy status code and relevant headers from upstream response 199 w.WriteHeader(resp.StatusCode) 200 for key, values := range resp.Header { 201 for _, v := range values { 202 w.Header().Add(key, v) 203 } 204 } 205 206 // stream the body directly to the client 207 if _, err := io.Copy(w, resp.Body); err != nil { 208 log.Println("error streaming response to client:", err) 209 } 210} 211 212// TODO: proper statuses here on early exit 213func (rp *Repo) DeleteArtifact(w http.ResponseWriter, r *http.Request) { 214 user := rp.oauth.GetUser(r) 215 tagParam := chi.URLParam(r, "tag") 216 filename := chi.URLParam(r, "file") 217 f, err := rp.repoResolver.Resolve(r) 218 if err != nil { 219 log.Println("failed to get repo and knot", err) 220 return 221 } 222 223 client, _ := rp.oauth.AuthorizedClient(r) 224 225 tag := plumbing.NewHash(tagParam) 226 227 artifacts, err := db.GetArtifact( 228 rp.db, 229 db.FilterEq("repo_at", f.RepoAt()), 230 db.FilterEq("tag", tag[:]), 231 db.FilterEq("name", filename), 232 ) 233 if err != nil { 234 log.Println("failed to get artifacts", err) 235 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 236 return 237 } 238 if len(artifacts) != 1 { 239 rp.pages.Notice(w, "remove", "Unable to find artifact.") 240 return 241 } 242 243 artifact := artifacts[0] 244 245 if user.Did != artifact.Did { 246 log.Println("user not authorized to delete artifact", err) 247 rp.pages.Notice(w, "remove", "Unauthorized deletion of artifact.") 248 return 249 } 250 251 _, err = client.RepoDeleteRecord(r.Context(), &comatproto.RepoDeleteRecord_Input{ 252 Collection: tangled.RepoArtifactNSID, 253 Repo: user.Did, 254 Rkey: artifact.Rkey, 255 }) 256 if err != nil { 257 log.Println("failed to get blob from pds", err) 258 rp.pages.Notice(w, "remove", "Failed to remove blob from PDS.") 259 return 260 } 261 262 tx, err := rp.db.BeginTx(r.Context(), nil) 263 if err != nil { 264 log.Println("failed to start tx") 265 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 266 return 267 } 268 defer tx.Rollback() 269 270 err = db.DeleteArtifact(tx, 271 db.FilterEq("repo_at", f.RepoAt()), 272 db.FilterEq("tag", artifact.Tag[:]), 273 db.FilterEq("name", filename), 274 ) 275 if err != nil { 276 log.Println("failed to remove artifact record from db", err) 277 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 278 return 279 } 280 281 err = tx.Commit() 282 if err != nil { 283 log.Println("failed to remove artifact record from db") 284 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 285 return 286 } 287 288 w.Write([]byte{}) 289} 290 291func (rp *Repo) resolveTag(ctx context.Context, f *reporesolver.ResolvedRepo, tagParam string) (*types.TagReference, error) { 292 tagParam, err := url.QueryUnescape(tagParam) 293 if err != nil { 294 return nil, err 295 } 296 297 scheme := "http" 298 if !rp.config.Core.Dev { 299 scheme = "https" 300 } 301 host := fmt.Sprintf("%s://%s", scheme, f.Knot) 302 xrpcc := &indigoxrpc.Client{ 303 Host: host, 304 } 305 306 repo := fmt.Sprintf("%s/%s", f.OwnerDid(), f.Name) 307 xrpcBytes, err := tangled.RepoTags(ctx, xrpcc, "", 0, repo) 308 if err != nil { 309 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 310 log.Println("failed to call XRPC repo.tags", xrpcerr) 311 return nil, xrpcerr 312 } 313 log.Println("failed to reach knotserver", err) 314 return nil, err 315 } 316 317 var result types.RepoTagsResponse 318 if err := json.Unmarshal(xrpcBytes, &result); err != nil { 319 log.Println("failed to decode XRPC tags response", err) 320 return nil, err 321 } 322 323 var tag *types.TagReference 324 for _, t := range result.Tags { 325 if t.Tag != nil { 326 if t.Reference.Name == tagParam || t.Reference.Hash == tagParam { 327 tag = t 328 } 329 } 330 } 331 332 if tag == nil { 333 return nil, fmt.Errorf("invalid tag, only annotated tags are supported for artifacts") 334 } 335 336 if tag.Tag.Target.IsZero() { 337 return nil, fmt.Errorf("invalid tag, only annotated tags are supported for artifacts") 338 } 339 340 return tag, nil 341}