1package server
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "time"
9
10 "github.com/Azure/go-autorest/autorest/to"
11 "github.com/bluesky-social/indigo/api/atproto"
12 "github.com/bluesky-social/indigo/atproto/data"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/bluesky-social/indigo/carstore"
15 "github.com/bluesky-social/indigo/events"
16 lexutil "github.com/bluesky-social/indigo/lex/util"
17 "github.com/bluesky-social/indigo/repo"
18 "github.com/bluesky-social/indigo/util"
19 "github.com/haileyok/cocoon/blockstore"
20 "github.com/haileyok/cocoon/models"
21 blocks "github.com/ipfs/go-block-format"
22 "github.com/ipfs/go-cid"
23 cbor "github.com/ipfs/go-ipld-cbor"
24 "github.com/ipld/go-car"
25 "gorm.io/gorm"
26 "gorm.io/gorm/clause"
27)
28
29type RepoMan struct {
30 db *gorm.DB
31 s *Server
32 clock *syntax.TIDClock
33}
34
35func NewRepoMan(s *Server) *RepoMan {
36 clock := syntax.NewTIDClock(0)
37
38 return &RepoMan{
39 s: s,
40 db: s.db,
41 clock: &clock,
42 }
43}
44
45type OpType string
46
47var (
48 OpTypeCreate = OpType("com.atproto.repo.applyWrites#create")
49 OpTypeUpdate = OpType("com.atproto.repo.applyWrites#update")
50 OpTypeDelete = OpType("com.atproto.repo.applyWrites#delete")
51)
52
53func (ot OpType) String() string {
54 return ot.String()
55}
56
57type Op struct {
58 Type OpType `json:"$type"`
59 Collection string `json:"collection"`
60 Rkey *string `json:"rkey,omitempty"`
61 Validate *bool `json:"validate,omitempty"`
62 SwapRecord *string `json:"swapRecord,omitempty"`
63 Record *MarshalableMap `json:"record,omitempty"`
64}
65
66type MarshalableMap map[string]any
67
68type FirehoseOp struct {
69 Cid cid.Cid
70 Path string
71 Action string
72}
73
74func (mm *MarshalableMap) MarshalCBOR(w io.Writer) error {
75 data, err := data.MarshalCBOR(*mm)
76 if err != nil {
77 return err
78 }
79
80 w.Write(data)
81
82 return nil
83}
84
85type ApplyWriteResult struct {
86 Uri string `json:"uri"`
87 Cid string `json:"cid"`
88 Commit *RepoCommit `json:"commit"`
89 ValidationStatus *string `json:"validationStatus"`
90}
91
92type RepoCommit struct {
93 Cid string `json:"cid"`
94 Rev string `json:"rev"`
95}
96
97// TODO make use of swap commit
98func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) {
99 rootcid, err := cid.Cast(urepo.Root)
100 if err != nil {
101 return nil, err
102 }
103
104 dbs := blockstore.New(urepo.Did, rm.db)
105 r, err := repo.OpenRepo(context.TODO(), dbs, rootcid)
106
107 entries := []models.Record{}
108
109 for i, op := range writes {
110 if op.Type != OpTypeCreate && op.Rkey == nil {
111 return nil, fmt.Errorf("invalid rkey")
112 } else if op.Rkey == nil {
113 op.Rkey = to.StringPtr(rm.clock.Next().String())
114 writes[i].Rkey = op.Rkey
115 }
116
117 _, err := syntax.ParseRecordKey(*op.Rkey)
118 if err != nil {
119 return nil, err
120 }
121
122 switch op.Type {
123 case OpTypeCreate:
124 nc, err := r.PutRecord(context.TODO(), op.Collection+"/"+*op.Rkey, op.Record)
125 if err != nil {
126 return nil, err
127 }
128
129 d, _ := data.MarshalCBOR(*op.Record)
130 entries = append(entries, models.Record{
131 Did: urepo.Did,
132 CreatedAt: rm.clock.Next().String(),
133 Nsid: op.Collection,
134 Rkey: *op.Rkey,
135 Cid: nc.String(),
136 Value: d,
137 })
138 case OpTypeDelete:
139 err := r.DeleteRecord(context.TODO(), op.Collection+"/"+*op.Rkey)
140 if err != nil {
141 return nil, err
142 }
143 case OpTypeUpdate:
144 nc, err := r.UpdateRecord(context.TODO(), op.Collection+"/"+*op.Rkey, op.Record)
145 if err != nil {
146 return nil, err
147 }
148
149 d, _ := data.MarshalCBOR(*op.Record)
150 entries = append(entries, models.Record{
151 Did: urepo.Did,
152 CreatedAt: rm.clock.Next().String(),
153 Nsid: op.Collection,
154 Rkey: *op.Rkey,
155 Cid: nc.String(),
156 Value: d,
157 })
158 }
159 }
160
161 newroot, rev, err := r.Commit(context.TODO(), urepo.SignFor)
162 if err != nil {
163 return nil, err
164 }
165
166 buf := new(bytes.Buffer)
167
168 hb, err := cbor.DumpObject(&car.CarHeader{
169 Roots: []cid.Cid{newroot},
170 Version: 1,
171 })
172
173 if _, err := carstore.LdWrite(buf, hb); err != nil {
174 return nil, err
175 }
176
177 diffops, err := r.DiffSince(context.TODO(), rootcid)
178 if err != nil {
179 return nil, err
180 }
181
182 ops := make([]*atproto.SyncSubscribeRepos_RepoOp, 0, len(diffops))
183
184 for _, op := range diffops {
185 switch op.Op {
186 case "add", "mut":
187 kind := "create"
188 if op.Op == "mut" {
189 kind = "update"
190 }
191
192 ll := lexutil.LexLink(op.NewCid)
193 ops = append(ops, &atproto.SyncSubscribeRepos_RepoOp{
194 Action: kind,
195 Path: op.Rpath,
196 Cid: &ll,
197 })
198
199 case "del":
200 ops = append(ops, &atproto.SyncSubscribeRepos_RepoOp{
201 Action: "delete",
202 Path: op.Rpath,
203 Cid: nil,
204 })
205 }
206
207 blk, err := dbs.Get(context.TODO(), op.NewCid)
208 if err != nil {
209 return nil, err
210 }
211
212 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil {
213 return nil, err
214 }
215 }
216
217 for _, op := range dbs.GetLog() {
218 if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil {
219 return nil, err
220 }
221 }
222
223 var results []ApplyWriteResult
224
225 var blobs []lexutil.LexLink
226 for _, entry := range entries {
227 if err := rm.s.db.Clauses(clause.OnConflict{
228 Columns: []clause.Column{{Name: "did"}, {Name: "nsid"}, {Name: "rkey"}},
229 UpdateAll: true,
230 }).Create(&entry).Error; err != nil {
231 return nil, err
232 }
233
234 // we should actually check the type (i.e. delete, create,., update) here but we'll do it later
235 cids, err := rm.incrementBlobRefs(urepo, entry.Value)
236 if err != nil {
237 return nil, err
238 }
239
240 for _, c := range cids {
241 blobs = append(blobs, lexutil.LexLink(c))
242 }
243
244 results = append(results, ApplyWriteResult{
245 Uri: "at://" + urepo.Did + "/" + entry.Nsid + "/" + entry.Rkey,
246 Cid: entry.Cid,
247 Commit: &RepoCommit{
248 Cid: newroot.String(),
249 Rev: rev,
250 },
251 ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol
252 })
253 }
254
255 rm.s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
256 RepoCommit: &atproto.SyncSubscribeRepos_Commit{
257 Repo: urepo.Did,
258 Blocks: buf.Bytes(),
259 Blobs: blobs,
260 Rev: rev,
261 Since: &urepo.Rev,
262 Commit: lexutil.LexLink(newroot),
263 Time: time.Now().Format(util.ISO8601),
264 Ops: ops,
265 TooBig: false,
266 },
267 })
268
269 if err := dbs.UpdateRepo(context.TODO(), newroot, rev); err != nil {
270 return nil, err
271 }
272
273 return results, nil
274}
275
276func (rm *RepoMan) getRecordProof(urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) {
277 c, err := cid.Cast(urepo.Root)
278 if err != nil {
279 return cid.Undef, nil, err
280 }
281
282 dbs := blockstore.New(urepo.Did, rm.db)
283 bs := util.NewLoggingBstore(dbs)
284
285 r, err := repo.OpenRepo(context.TODO(), bs, c)
286 if err != nil {
287 return cid.Undef, nil, err
288 }
289
290 _, _, err = r.GetRecordBytes(context.TODO(), collection+"/"+rkey)
291 if err != nil {
292 return cid.Undef, nil, err
293 }
294
295 return c, bs.GetLoggedBlocks(), nil
296}
297
298func (rm *RepoMan) incrementBlobRefs(urepo models.Repo, cbor []byte) ([]cid.Cid, error) {
299 cids, err := getBlobCidsFromCbor(cbor)
300 if err != nil {
301 return nil, err
302 }
303
304 for _, c := range cids {
305 if err := rm.db.Exec("UPDATE blobs SET ref_count = ref_count + 1 WHERE did = ? AND cid = ?", urepo.Did, c.Bytes()).Error; err != nil {
306 return nil, err
307 }
308 }
309
310 return cids, nil
311}
312
313// to be honest, we could just store both the cbor and non-cbor in []entries above to avoid an additional
314// unmarshal here. this will work for now though
315func getBlobCidsFromCbor(cbor []byte) ([]cid.Cid, error) {
316 var cids []cid.Cid
317
318 decoded, err := data.UnmarshalCBOR(cbor)
319 if err != nil {
320 return nil, fmt.Errorf("error unmarshaling cbor: %w", err)
321 }
322
323 var deepiter func(interface{}) error
324 deepiter = func(item interface{}) error {
325 switch val := item.(type) {
326 case map[string]interface{}:
327 if val["$type"] == "blob" {
328 if ref, ok := val["ref"].(string); ok {
329 c, err := cid.Parse(ref)
330 if err != nil {
331 return err
332 }
333 cids = append(cids, c)
334 }
335 for _, v := range val {
336 return deepiter(v)
337 }
338 }
339 case []interface{}:
340 for _, v := range val {
341 deepiter(v)
342 }
343 }
344
345 return nil
346 }
347
348 if err := deepiter(decoded); err != nil {
349 return nil, err
350 }
351
352 return cids, nil
353}