An atproto PDS written in Go
1package server 2 3import ( 4 "bytes" 5 "io" 6 7 "github.com/haileyok/cocoon/internal/helpers" 8 "github.com/haileyok/cocoon/models" 9 "github.com/ipfs/go-cid" 10 "github.com/labstack/echo/v4" 11 "github.com/multiformats/go-multihash" 12) 13 14const ( 15 blockSize = 0x10000 16) 17 18type ComAtprotoRepoUploadBlobResponse struct { 19 Blob struct { 20 Type string `json:"$type"` 21 Ref struct { 22 Link string `json:"$link"` 23 } `json:"ref"` 24 MimeType string `json:"mimeType"` 25 Size int `json:"size"` 26 } `json:"blob"` 27} 28 29func (s *Server) handleRepoUploadBlob(e echo.Context) error { 30 urepo := e.Get("repo").(*models.RepoActor) 31 32 mime := e.Request().Header.Get("content-type") 33 if mime == "" { 34 mime = "application/octet-stream" 35 } 36 37 blob := models.Blob{ 38 Did: urepo.Repo.Did, 39 RefCount: 0, 40 CreatedAt: s.repoman.clock.Next().String(), 41 } 42 43 if err := s.db.Create(&blob, nil).Error; err != nil { 44 s.logger.Error("error creating new blob in db", "error", err) 45 return helpers.ServerError(e, nil) 46 } 47 48 read := 0 49 part := 0 50 51 buf := make([]byte, 0x10000) 52 fulldata := new(bytes.Buffer) 53 54 for { 55 n, err := io.ReadFull(e.Request().Body, buf) 56 if err == io.ErrUnexpectedEOF || err == io.EOF { 57 if n == 0 { 58 break 59 } 60 } else if err != nil && err != io.ErrUnexpectedEOF { 61 s.logger.Error("error reading blob", "error", err) 62 return helpers.ServerError(e, nil) 63 } 64 65 data := buf[:n] 66 read += n 67 fulldata.Write(data) 68 69 blobPart := models.BlobPart{ 70 BlobID: blob.ID, 71 Idx: part, 72 Data: data, 73 } 74 75 if err := s.db.Create(&blobPart, nil).Error; err != nil { 76 s.logger.Error("error adding blob part to db", "error", err) 77 return helpers.ServerError(e, nil) 78 } 79 part++ 80 81 if n < blockSize { 82 break 83 } 84 } 85 86 c, err := cid.NewPrefixV1(cid.Raw, multihash.SHA2_256).Sum(fulldata.Bytes()) 87 if err != nil { 88 s.logger.Error("error creating cid prefix", "error", err) 89 return helpers.ServerError(e, nil) 90 } 91 92 if err := s.db.Exec("UPDATE blobs SET cid = ? WHERE id = ?", nil, c.Bytes(), blob.ID).Error; err != nil { 93 // there should probably be somme handling here if this fails... 94 s.logger.Error("error updating blob", "error", err) 95 return helpers.ServerError(e, nil) 96 } 97 98 resp := ComAtprotoRepoUploadBlobResponse{} 99 resp.Blob.Type = "blob" 100 resp.Blob.Ref.Link = c.String() 101 resp.Blob.MimeType = mime 102 resp.Blob.Size = read 103 104 return e.JSON(200, resp) 105}