An atproto PDS written in Go
at main 2.9 kB view raw
1package server 2 3import ( 4 "bytes" 5 "context" 6 "io" 7 "slices" 8 "strings" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/bluesky-social/indigo/repo" 12 "github.com/haileyok/cocoon/internal/helpers" 13 "github.com/haileyok/cocoon/models" 14 blocks "github.com/ipfs/go-block-format" 15 "github.com/ipfs/go-cid" 16 "github.com/ipld/go-car" 17 "github.com/labstack/echo/v4" 18) 19 20func (s *Server) handleRepoImportRepo(e echo.Context) error { 21 urepo := e.Get("repo").(*models.RepoActor) 22 23 b, err := io.ReadAll(e.Request().Body) 24 if err != nil { 25 s.logger.Error("could not read bytes in import request", "error", err) 26 return helpers.ServerError(e, nil) 27 } 28 29 bs := s.getBlockstore(urepo.Repo.Did) 30 31 cs, err := car.NewCarReader(bytes.NewReader(b)) 32 if err != nil { 33 s.logger.Error("could not read car in import request", "error", err) 34 return helpers.ServerError(e, nil) 35 } 36 37 orderedBlocks := []blocks.Block{} 38 currBlock, err := cs.Next() 39 if err != nil { 40 s.logger.Error("could not get first block from car", "error", err) 41 return helpers.ServerError(e, nil) 42 } 43 currBlockCt := 1 44 45 for currBlock != nil { 46 s.logger.Info("someone is importing their repo", "block", currBlockCt) 47 orderedBlocks = append(orderedBlocks, currBlock) 48 next, _ := cs.Next() 49 currBlock = next 50 currBlockCt++ 51 } 52 53 slices.Reverse(orderedBlocks) 54 55 if err := bs.PutMany(context.TODO(), orderedBlocks); err != nil { 56 s.logger.Error("could not insert blocks", "error", err) 57 return helpers.ServerError(e, nil) 58 } 59 60 r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0]) 61 if err != nil { 62 s.logger.Error("could not open repo", "error", err) 63 return helpers.ServerError(e, nil) 64 } 65 66 tx := s.db.BeginDangerously() 67 68 clock := syntax.NewTIDClock(0) 69 70 if err := r.ForEach(context.TODO(), "", func(key string, cid cid.Cid) error { 71 pts := strings.Split(key, "/") 72 nsid := pts[0] 73 rkey := pts[1] 74 cidStr := cid.String() 75 b, err := bs.Get(context.TODO(), cid) 76 if err != nil { 77 s.logger.Error("record bytes don't exist in blockstore", "error", err) 78 return helpers.ServerError(e, nil) 79 } 80 81 rec := models.Record{ 82 Did: urepo.Repo.Did, 83 CreatedAt: clock.Next().String(), 84 Nsid: nsid, 85 Rkey: rkey, 86 Cid: cidStr, 87 Value: b.RawData(), 88 } 89 90 if err := tx.Save(rec).Error; err != nil { 91 return err 92 } 93 94 return nil 95 }); err != nil { 96 tx.Rollback() 97 s.logger.Error("record bytes don't exist in blockstore", "error", err) 98 return helpers.ServerError(e, nil) 99 } 100 101 tx.Commit() 102 103 root, rev, err := r.Commit(context.TODO(), urepo.SignFor) 104 if err != nil { 105 s.logger.Error("error committing", "error", err) 106 return helpers.ServerError(e, nil) 107 } 108 109 if err := s.UpdateRepo(context.TODO(), urepo.Repo.Did, root, rev); err != nil { 110 s.logger.Error("error updating repo after commit", "error", err) 111 return helpers.ServerError(e, nil) 112 } 113 114 return nil 115}