1package server
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "time"
9
10 "github.com/Azure/go-autorest/autorest/to"
11 "github.com/bluesky-social/indigo/api/atproto"
12 "github.com/bluesky-social/indigo/atproto/data"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/bluesky-social/indigo/carstore"
15 "github.com/bluesky-social/indigo/events"
16 lexutil "github.com/bluesky-social/indigo/lex/util"
17 "github.com/bluesky-social/indigo/repo"
18 "github.com/bluesky-social/indigo/util"
19 "github.com/haileyok/cocoon/blockstore"
20 "github.com/haileyok/cocoon/models"
21 blocks "github.com/ipfs/go-block-format"
22 "github.com/ipfs/go-cid"
23 cbor "github.com/ipfs/go-ipld-cbor"
24 "github.com/ipld/go-car"
25 "gorm.io/gorm"
26 "gorm.io/gorm/clause"
27)
28
29type RepoMan struct {
30 db *gorm.DB
31 s *Server
32 clock *syntax.TIDClock
33}
34
35func NewRepoMan(s *Server) *RepoMan {
36 clock := syntax.NewTIDClock(0)
37
38 return &RepoMan{
39 s: s,
40 db: s.db,
41 clock: &clock,
42 }
43}
44
45type OpType string
46
47var (
48 OpTypeCreate = OpType("com.atproto.repo.applyWrites#create")
49 OpTypeUpdate = OpType("com.atproto.repo.applyWrites#update")
50 OpTypeDelete = OpType("com.atproto.repo.applyWrites#delete")
51)
52
53func (ot OpType) String() string {
54 return ot.String()
55}
56
57type Op struct {
58 Type OpType `json:"$type"`
59 Collection string `json:"collection"`
60 Rkey *string `json:"rkey,omitempty"`
61 Validate *bool `json:"validate,omitempty"`
62 SwapRecord *string `json:"swapRecord,omitempty"`
63 Record *MarshalableMap `json:"record,omitempty"`
64}
65
66type MarshalableMap map[string]any
67
68type FirehoseOp struct {
69 Cid cid.Cid
70 Path string
71 Action string
72}
73
74func (mm *MarshalableMap) MarshalCBOR(w io.Writer) error {
75 data, err := data.MarshalCBOR(*mm)
76 if err != nil {
77 return err
78 }
79
80 w.Write(data)
81
82 return nil
83}
84
85// TODO make use of swap commit
86func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *string) error {
87 rootcid, err := cid.Cast(urepo.Root)
88 if err != nil {
89 return err
90 }
91
92 dbs := blockstore.New(urepo.Did, rm.db)
93 r, err := repo.OpenRepo(context.TODO(), dbs, rootcid)
94
95 entries := []models.Record{}
96
97 for i, op := range writes {
98 if op.Type != OpTypeCreate && op.Rkey == nil {
99 return fmt.Errorf("invalid rkey")
100 } else if op.Rkey == nil {
101 op.Rkey = to.StringPtr(rm.clock.Next().String())
102 writes[i].Rkey = op.Rkey
103 }
104
105 _, err := syntax.ParseRecordKey(*op.Rkey)
106 if err != nil {
107 return err
108 }
109
110 switch op.Type {
111 case OpTypeCreate:
112 nc, err := r.PutRecord(context.TODO(), op.Collection+"/"+*op.Rkey, op.Record)
113 if err != nil {
114 return err
115 }
116
117 d, _ := data.MarshalCBOR(*op.Record)
118 entries = append(entries, models.Record{
119 Did: urepo.Did,
120 CreatedAt: rm.clock.Next().String(),
121 Nsid: op.Collection,
122 Rkey: *op.Rkey,
123 Cid: nc.String(),
124 Value: d,
125 })
126 case OpTypeDelete:
127 err := r.DeleteRecord(context.TODO(), op.Collection+"/"+*op.Rkey)
128 if err != nil {
129 return err
130 }
131 case OpTypeUpdate:
132 nc, err := r.UpdateRecord(context.TODO(), op.Collection+"/"+*op.Rkey, op.Record)
133 if err != nil {
134 return err
135 }
136
137 d, _ := data.MarshalCBOR(*op.Record)
138 entries = append(entries, models.Record{
139 Did: urepo.Did,
140 CreatedAt: rm.clock.Next().String(),
141 Nsid: op.Collection,
142 Rkey: *op.Rkey,
143 Cid: nc.String(),
144 Value: d,
145 })
146 }
147 }
148
149 newroot, rev, err := r.Commit(context.TODO(), urepo.SignFor)
150 if err != nil {
151 return err
152 }
153
154 buf := new(bytes.Buffer)
155
156 hb, err := cbor.DumpObject(&car.CarHeader{
157 Roots: []cid.Cid{newroot},
158 Version: 1,
159 })
160
161 if _, err := carstore.LdWrite(buf, hb); err != nil {
162 return err
163 }
164
165 diffops, err := r.DiffSince(context.TODO(), rootcid)
166 if err != nil {
167 return err
168 }
169
170 ops := make([]*atproto.SyncSubscribeRepos_RepoOp, 0, len(diffops))
171
172 for _, op := range diffops {
173 switch op.Op {
174 case "add", "mut":
175 kind := "create"
176 if op.Op == "mut" {
177 kind = "update"
178 }
179
180 ll := lexutil.LexLink(op.NewCid)
181 ops = append(ops, &atproto.SyncSubscribeRepos_RepoOp{
182 Action: kind,
183 Path: op.Rpath,
184 Cid: &ll,
185 })
186
187 case "del":
188 ops = append(ops, &atproto.SyncSubscribeRepos_RepoOp{
189 Action: "delete",
190 Path: op.Rpath,
191 Cid: nil,
192 })
193 }
194
195 blk, err := dbs.Get(context.TODO(), op.NewCid)
196 if err != nil {
197 return err
198 }
199
200 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil {
201 return err
202 }
203 }
204
205 for _, op := range dbs.GetLog() {
206 if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil {
207 return err
208 }
209 }
210
211 var blobs []lexutil.LexLink
212 for _, entry := range entries {
213 if err := rm.s.db.Clauses(clause.OnConflict{
214 Columns: []clause.Column{{Name: "did"}, {Name: "nsid"}, {Name: "rkey"}},
215 UpdateAll: true,
216 }).Create(&entry).Error; err != nil {
217 return err
218 }
219
220 // we should actually check the type (i.e. delete, create,., update) here but we'll do it later
221 cids, err := rm.incrementBlobRefs(urepo, entry.Value)
222 if err != nil {
223 return err
224 }
225
226 for _, c := range cids {
227 blobs = append(blobs, lexutil.LexLink(c))
228 }
229 }
230
231 rm.s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
232 RepoCommit: &atproto.SyncSubscribeRepos_Commit{
233 Repo: urepo.Did,
234 Blocks: buf.Bytes(),
235 Blobs: blobs,
236 Rev: rev,
237 Since: &urepo.Rev,
238 Commit: lexutil.LexLink(newroot),
239 Time: time.Now().Format(util.ISO8601),
240 Ops: ops,
241 TooBig: false,
242 },
243 })
244
245 if err := dbs.UpdateRepo(context.TODO(), newroot, rev); err != nil {
246 return err
247 }
248
249 return nil
250}
251
252func (rm *RepoMan) getRecordProof(urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) {
253 c, err := cid.Cast(urepo.Root)
254 if err != nil {
255 return cid.Undef, nil, err
256 }
257
258 dbs := blockstore.New(urepo.Did, rm.db)
259 bs := util.NewLoggingBstore(dbs)
260
261 r, err := repo.OpenRepo(context.TODO(), bs, c)
262 if err != nil {
263 return cid.Undef, nil, err
264 }
265
266 _, _, err = r.GetRecordBytes(context.TODO(), collection+"/"+rkey)
267 if err != nil {
268 return cid.Undef, nil, err
269 }
270
271 return c, bs.GetLoggedBlocks(), nil
272}
273
274func (rm *RepoMan) incrementBlobRefs(urepo models.Repo, cbor []byte) ([]cid.Cid, error) {
275 cids, err := getBlobCidsFromCbor(cbor)
276 if err != nil {
277 return nil, err
278 }
279
280 for _, c := range cids {
281 if err := rm.db.Exec("UPDATE blobs SET ref_count = ref_count + 1 WHERE did = ? AND cid = ?", urepo.Did, c.Bytes()).Error; err != nil {
282 return nil, err
283 }
284 }
285
286 return cids, nil
287}
288
289// to be honest, we could just store both the cbor and non-cbor in []entries above to avoid an additional
290// unmarshal here. this will work for now though
291func getBlobCidsFromCbor(cbor []byte) ([]cid.Cid, error) {
292 var cids []cid.Cid
293
294 decoded, err := data.UnmarshalCBOR(cbor)
295 if err != nil {
296 return nil, fmt.Errorf("error unmarshaling cbor: %w", err)
297 }
298
299 var deepiter func(interface{}) error
300 deepiter = func(item interface{}) error {
301 switch val := item.(type) {
302 case map[string]interface{}:
303 if val["$type"] == "blob" {
304 if ref, ok := val["ref"].(string); ok {
305 c, err := cid.Parse(ref)
306 if err != nil {
307 return err
308 }
309 cids = append(cids, c)
310 }
311 for _, v := range val {
312 return deepiter(v)
313 }
314 }
315 case []interface{}:
316 for _, v := range val {
317 deepiter(v)
318 }
319 }
320
321 return nil
322 }
323
324 if err := deepiter(decoded); err != nil {
325 return nil, err
326 }
327
328 return cids, nil
329}