1package server
2
3import (
4 "bytes"
5 "context"
6 "io"
7 "slices"
8 "strings"
9
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "github.com/bluesky-social/indigo/repo"
12 "github.com/haileyok/cocoon/internal/helpers"
13 "github.com/haileyok/cocoon/models"
14 blocks "github.com/ipfs/go-block-format"
15 "github.com/ipfs/go-cid"
16 "github.com/ipld/go-car"
17 "github.com/labstack/echo/v4"
18)
19
20func (s *Server) handleRepoImportRepo(e echo.Context) error {
21 urepo := e.Get("repo").(*models.RepoActor)
22
23 b, err := io.ReadAll(e.Request().Body)
24 if err != nil {
25 s.logger.Error("could not read bytes in import request", "error", err)
26 return helpers.ServerError(e, nil)
27 }
28
29 bs := s.getBlockstore(urepo.Repo.Did)
30
31 cs, err := car.NewCarReader(bytes.NewReader(b))
32 if err != nil {
33 s.logger.Error("could not read car in import request", "error", err)
34 return helpers.ServerError(e, nil)
35 }
36
37 orderedBlocks := []blocks.Block{}
38 currBlock, err := cs.Next()
39 if err != nil {
40 s.logger.Error("could not get first block from car", "error", err)
41 return helpers.ServerError(e, nil)
42 }
43 currBlockCt := 1
44
45 for currBlock != nil {
46 s.logger.Info("someone is importing their repo", "block", currBlockCt)
47 orderedBlocks = append(orderedBlocks, currBlock)
48 next, _ := cs.Next()
49 currBlock = next
50 currBlockCt++
51 }
52
53 slices.Reverse(orderedBlocks)
54
55 if err := bs.PutMany(context.TODO(), orderedBlocks); err != nil {
56 s.logger.Error("could not insert blocks", "error", err)
57 return helpers.ServerError(e, nil)
58 }
59
60 r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0])
61 if err != nil {
62 s.logger.Error("could not open repo", "error", err)
63 return helpers.ServerError(e, nil)
64 }
65
66 tx := s.db.BeginDangerously()
67
68 clock := syntax.NewTIDClock(0)
69
70 if err := r.ForEach(context.TODO(), "", func(key string, cid cid.Cid) error {
71 pts := strings.Split(key, "/")
72 nsid := pts[0]
73 rkey := pts[1]
74 cidStr := cid.String()
75 b, err := bs.Get(context.TODO(), cid)
76 if err != nil {
77 s.logger.Error("record bytes don't exist in blockstore", "error", err)
78 return helpers.ServerError(e, nil)
79 }
80
81 rec := models.Record{
82 Did: urepo.Repo.Did,
83 CreatedAt: clock.Next().String(),
84 Nsid: nsid,
85 Rkey: rkey,
86 Cid: cidStr,
87 Value: b.RawData(),
88 }
89
90 if err := tx.Save(rec).Error; err != nil {
91 return err
92 }
93
94 return nil
95 }); err != nil {
96 tx.Rollback()
97 s.logger.Error("record bytes don't exist in blockstore", "error", err)
98 return helpers.ServerError(e, nil)
99 }
100
101 tx.Commit()
102
103 root, rev, err := r.Commit(context.TODO(), urepo.SignFor)
104 if err != nil {
105 s.logger.Error("error committing", "error", err)
106 return helpers.ServerError(e, nil)
107 }
108
109 if err := s.UpdateRepo(context.TODO(), urepo.Repo.Did, root, rev); err != nil {
110 s.logger.Error("error updating repo after commit", "error", err)
111 return helpers.ServerError(e, nil)
112 }
113
114 return nil
115}