forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package repo 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "log" 9 "net/http" 10 "net/url" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview/db" 15 "tangled.org/core/appview/models" 16 "tangled.org/core/appview/pages" 17 "tangled.org/core/appview/reporesolver" 18 "tangled.org/core/appview/xrpcclient" 19 "tangled.org/core/tid" 20 "tangled.org/core/types" 21 22 comatproto "github.com/bluesky-social/indigo/api/atproto" 23 lexutil "github.com/bluesky-social/indigo/lex/util" 24 indigoxrpc "github.com/bluesky-social/indigo/xrpc" 25 "github.com/dustin/go-humanize" 26 "github.com/go-chi/chi/v5" 27 "github.com/go-git/go-git/v5/plumbing" 28 "github.com/ipfs/go-cid" 29) 30 31// TODO: proper statuses here on early exit 32func (rp *Repo) AttachArtifact(w http.ResponseWriter, r *http.Request) { 33 user := rp.oauth.GetUser(r) 34 tagParam := chi.URLParam(r, "tag") 35 f, err := rp.repoResolver.Resolve(r) 36 if err != nil { 37 log.Println("failed to get repo and knot", err) 38 rp.pages.Notice(w, "upload", "failed to upload artifact, error in repo resolution") 39 return 40 } 41 42 tag, err := rp.resolveTag(r.Context(), f, tagParam) 43 if err != nil { 44 log.Println("failed to resolve tag", err) 45 rp.pages.Notice(w, "upload", "failed to upload artifact, error in tag resolution") 46 return 47 } 48 49 file, handler, err := r.FormFile("artifact") 50 if err != nil { 51 log.Println("failed to upload artifact", err) 52 rp.pages.Notice(w, "upload", "failed to upload artifact") 53 return 54 } 55 defer file.Close() 56 57 client, err := rp.oauth.AuthorizedClient(r) 58 if err != nil { 59 log.Println("failed to get authorized client", err) 60 rp.pages.Notice(w, "upload", "failed to get authorized client") 61 return 62 } 63 64 uploadBlobResp, err := comatproto.RepoUploadBlob(r.Context(), client, file) 65 if err != nil { 66 log.Println("failed to upload blob", err) 67 rp.pages.Notice(w, "upload", "Failed to upload blob to your PDS. Try again later.") 68 return 69 } 70 71 log.Println("uploaded blob", humanize.Bytes(uint64(uploadBlobResp.Blob.Size)), uploadBlobResp.Blob.Ref.String()) 72 73 rkey := tid.TID() 74 createdAt := time.Now() 75 76 putRecordResp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 77 Collection: tangled.RepoArtifactNSID, 78 Repo: user.Did, 79 Rkey: rkey, 80 Record: &lexutil.LexiconTypeDecoder{ 81 Val: &tangled.RepoArtifact{ 82 Artifact: uploadBlobResp.Blob, 83 CreatedAt: createdAt.Format(time.RFC3339), 84 Name: handler.Filename, 85 Repo: f.RepoAt().String(), 86 Tag: tag.Tag.Hash[:], 87 }, 88 }, 89 }) 90 if err != nil { 91 log.Println("failed to create record", err) 92 rp.pages.Notice(w, "upload", "Failed to create artifact record. Try again later.") 93 return 94 } 95 96 log.Println(putRecordResp.Uri) 97 98 tx, err := rp.db.BeginTx(r.Context(), nil) 99 if err != nil { 100 log.Println("failed to start tx") 101 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.") 102 return 103 } 104 defer tx.Rollback() 105 106 artifact := models.Artifact{ 107 Did: user.Did, 108 Rkey: rkey, 109 RepoAt: f.RepoAt(), 110 Tag: tag.Tag.Hash, 111 CreatedAt: createdAt, 112 BlobCid: cid.Cid(uploadBlobResp.Blob.Ref), 113 Name: handler.Filename, 114 Size: uint64(uploadBlobResp.Blob.Size), 115 MimeType: uploadBlobResp.Blob.MimeType, 116 } 117 118 err = db.AddArtifact(tx, artifact) 119 if err != nil { 120 log.Println("failed to add artifact record to db", err) 121 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.") 122 return 123 } 124 125 err = tx.Commit() 126 if err != nil { 127 log.Println("failed to add artifact record to db") 128 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.") 129 return 130 } 131 132 rp.pages.RepoArtifactFragment(w, pages.RepoArtifactParams{ 133 LoggedInUser: user, 134 RepoInfo: f.RepoInfo(user), 135 Artifact: artifact, 136 }) 137} 138 139func (rp *Repo) DownloadArtifact(w http.ResponseWriter, r *http.Request) { 140 f, err := rp.repoResolver.Resolve(r) 141 if err != nil { 142 log.Println("failed to get repo and knot", err) 143 http.Error(w, "failed to resolve repo", http.StatusInternalServerError) 144 return 145 } 146 147 tagParam := chi.URLParam(r, "tag") 148 filename := chi.URLParam(r, "file") 149 150 tag, err := rp.resolveTag(r.Context(), f, tagParam) 151 if err != nil { 152 log.Println("failed to resolve tag", err) 153 rp.pages.Notice(w, "upload", "failed to upload artifact, error in tag resolution") 154 return 155 } 156 157 artifacts, err := db.GetArtifact( 158 rp.db, 159 db.FilterEq("repo_at", f.RepoAt()), 160 db.FilterEq("tag", tag.Tag.Hash[:]), 161 db.FilterEq("name", filename), 162 ) 163 if err != nil { 164 log.Println("failed to get artifacts", err) 165 http.Error(w, "failed to get artifact", http.StatusInternalServerError) 166 return 167 } 168 169 if len(artifacts) != 1 { 170 log.Printf("too many or too few artifacts found") 171 http.Error(w, "artifact not found", http.StatusNotFound) 172 return 173 } 174 175 artifact := artifacts[0] 176 177 ownerPds := f.OwnerId.PDSEndpoint() 178 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 179 q := url.Query() 180 q.Set("cid", artifact.BlobCid.String()) 181 q.Set("did", artifact.Did) 182 url.RawQuery = q.Encode() 183 184 req, err := http.NewRequest(http.MethodGet, url.String(), nil) 185 if err != nil { 186 log.Println("failed to create request", err) 187 http.Error(w, "failed to create request", http.StatusInternalServerError) 188 return 189 } 190 req.Header.Set("Content-Type", "application/json") 191 192 resp, err := http.DefaultClient.Do(req) 193 if err != nil { 194 log.Println("failed to make request", err) 195 http.Error(w, "failed to make request to PDS", http.StatusInternalServerError) 196 return 197 } 198 defer resp.Body.Close() 199 200 // copy status code and relevant headers from upstream response 201 w.WriteHeader(resp.StatusCode) 202 for key, values := range resp.Header { 203 for _, v := range values { 204 w.Header().Add(key, v) 205 } 206 } 207 208 // stream the body directly to the client 209 if _, err := io.Copy(w, resp.Body); err != nil { 210 log.Println("error streaming response to client:", err) 211 } 212} 213 214// TODO: proper statuses here on early exit 215func (rp *Repo) DeleteArtifact(w http.ResponseWriter, r *http.Request) { 216 user := rp.oauth.GetUser(r) 217 tagParam := chi.URLParam(r, "tag") 218 filename := chi.URLParam(r, "file") 219 f, err := rp.repoResolver.Resolve(r) 220 if err != nil { 221 log.Println("failed to get repo and knot", err) 222 return 223 } 224 225 client, _ := rp.oauth.AuthorizedClient(r) 226 227 tag := plumbing.NewHash(tagParam) 228 229 artifacts, err := db.GetArtifact( 230 rp.db, 231 db.FilterEq("repo_at", f.RepoAt()), 232 db.FilterEq("tag", tag[:]), 233 db.FilterEq("name", filename), 234 ) 235 if err != nil { 236 log.Println("failed to get artifacts", err) 237 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 238 return 239 } 240 if len(artifacts) != 1 { 241 rp.pages.Notice(w, "remove", "Unable to find artifact.") 242 return 243 } 244 245 artifact := artifacts[0] 246 247 if user.Did != artifact.Did { 248 log.Println("user not authorized to delete artifact", err) 249 rp.pages.Notice(w, "remove", "Unauthorized deletion of artifact.") 250 return 251 } 252 253 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 254 Collection: tangled.RepoArtifactNSID, 255 Repo: user.Did, 256 Rkey: artifact.Rkey, 257 }) 258 if err != nil { 259 log.Println("failed to get blob from pds", err) 260 rp.pages.Notice(w, "remove", "Failed to remove blob from PDS.") 261 return 262 } 263 264 tx, err := rp.db.BeginTx(r.Context(), nil) 265 if err != nil { 266 log.Println("failed to start tx") 267 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 268 return 269 } 270 defer tx.Rollback() 271 272 err = db.DeleteArtifact(tx, 273 db.FilterEq("repo_at", f.RepoAt()), 274 db.FilterEq("tag", artifact.Tag[:]), 275 db.FilterEq("name", filename), 276 ) 277 if err != nil { 278 log.Println("failed to remove artifact record from db", err) 279 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 280 return 281 } 282 283 err = tx.Commit() 284 if err != nil { 285 log.Println("failed to remove artifact record from db") 286 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 287 return 288 } 289 290 w.Write([]byte{}) 291} 292 293func (rp *Repo) resolveTag(ctx context.Context, f *reporesolver.ResolvedRepo, tagParam string) (*types.TagReference, error) { 294 tagParam, err := url.QueryUnescape(tagParam) 295 if err != nil { 296 return nil, err 297 } 298 299 scheme := "http" 300 if !rp.config.Core.Dev { 301 scheme = "https" 302 } 303 host := fmt.Sprintf("%s://%s", scheme, f.Knot) 304 xrpcc := &indigoxrpc.Client{ 305 Host: host, 306 } 307 308 repo := fmt.Sprintf("%s/%s", f.OwnerDid(), f.Name) 309 xrpcBytes, err := tangled.RepoTags(ctx, xrpcc, "", 0, repo) 310 if err != nil { 311 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 312 log.Println("failed to call XRPC repo.tags", xrpcerr) 313 return nil, xrpcerr 314 } 315 log.Println("failed to reach knotserver", err) 316 return nil, err 317 } 318 319 var result types.RepoTagsResponse 320 if err := json.Unmarshal(xrpcBytes, &result); err != nil { 321 log.Println("failed to decode XRPC tags response", err) 322 return nil, err 323 } 324 325 var tag *types.TagReference 326 for _, t := range result.Tags { 327 if t.Tag != nil { 328 if t.Reference.Name == tagParam || t.Reference.Hash == tagParam { 329 tag = t 330 } 331 } 332 } 333 334 if tag == nil { 335 return nil, fmt.Errorf("invalid tag, only annotated tags are supported for artifacts") 336 } 337 338 if tag.Tag.Target.IsZero() { 339 return nil, fmt.Errorf("invalid tag, only annotated tags are supported for artifacts") 340 } 341 342 return tag, nil 343}