An atproto PDS written in Go
at hailey/tidy 2.8 kB view raw
1package server 2 3import ( 4 "bytes" 5 "io" 6 "slices" 7 "strings" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "github.com/bluesky-social/indigo/repo" 11 "github.com/haileyok/cocoon/internal/helpers" 12 "github.com/haileyok/cocoon/models" 13 blocks "github.com/ipfs/go-block-format" 14 "github.com/ipfs/go-cid" 15 "github.com/ipld/go-car" 16 "github.com/labstack/echo/v4" 17) 18 19func (s *Server) handleRepoImportRepo(e echo.Context) error { 20 ctx := e.Request().Context() 21 22 urepo := e.Get("repo").(*models.RepoActor) 23 24 b, err := io.ReadAll(e.Request().Body) 25 if err != nil { 26 s.logger.Error("could not read bytes in import request", "error", err) 27 return helpers.ServerError(e, nil) 28 } 29 30 bs := s.getBlockstore(urepo.Repo.Did) 31 32 cs, err := car.NewCarReader(bytes.NewReader(b)) 33 if err != nil { 34 s.logger.Error("could not read car in import request", "error", err) 35 return helpers.ServerError(e, nil) 36 } 37 38 orderedBlocks := []blocks.Block{} 39 currBlock, err := cs.Next() 40 if err != nil { 41 s.logger.Error("could not get first block from car", "error", err) 42 return helpers.ServerError(e, nil) 43 } 44 currBlockCt := 1 45 46 for currBlock != nil { 47 s.logger.Info("someone is importing their repo", "block", currBlockCt) 48 orderedBlocks = append(orderedBlocks, currBlock) 49 next, _ := cs.Next() 50 currBlock = next 51 currBlockCt++ 52 } 53 54 slices.Reverse(orderedBlocks) 55 56 if err := bs.PutMany(ctx, orderedBlocks); err != nil { 57 s.logger.Error("could not insert blocks", "error", err) 58 return helpers.ServerError(e, nil) 59 } 60 61 r, err := repo.OpenRepo(ctx, bs, cs.Header.Roots[0]) 62 if err != nil { 63 s.logger.Error("could not open repo", "error", err) 64 return helpers.ServerError(e, nil) 65 } 66 67 tx := s.db.BeginDangerously() 68 69 clock := syntax.NewTIDClock(0) 70 71 if err := r.ForEach(ctx, "", func(key string, cid cid.Cid) error { 72 pts := strings.Split(key, "/") 73 nsid := pts[0] 74 rkey := pts[1] 75 cidStr := cid.String() 76 b, err := bs.Get(ctx, cid) 77 if err != nil { 78 s.logger.Error("record bytes don't exist in blockstore", "error", err) 79 return helpers.ServerError(e, nil) 80 } 81 82 rec := models.Record{ 83 Did: urepo.Repo.Did, 84 CreatedAt: clock.Next().String(), 85 Nsid: nsid, 86 Rkey: rkey, 87 Cid: cidStr, 88 Value: b.RawData(), 89 } 90 91 if err := tx.Save(rec).Error; err != nil { 92 return err 93 } 94 95 return nil 96 }); err != nil { 97 tx.Rollback() 98 s.logger.Error("record bytes don't exist in blockstore", "error", err) 99 return helpers.ServerError(e, nil) 100 } 101 102 tx.Commit() 103 104 root, rev, err := r.Commit(ctx, urepo.SignFor) 105 if err != nil { 106 s.logger.Error("error committing", "error", err) 107 return helpers.ServerError(e, nil) 108 } 109 110 if err := s.UpdateRepo(ctx, urepo.Repo.Did, root, rev); err != nil { 111 s.logger.Error("error updating repo after commit", "error", err) 112 return helpers.ServerError(e, nil) 113 } 114 115 return nil 116}