1package server
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "time"
9
10 "github.com/Azure/go-autorest/autorest/to"
11 "github.com/bluesky-social/indigo/api/atproto"
12 "github.com/bluesky-social/indigo/atproto/data"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/bluesky-social/indigo/carstore"
15 "github.com/bluesky-social/indigo/events"
16 lexutil "github.com/bluesky-social/indigo/lex/util"
17 "github.com/bluesky-social/indigo/repo"
18 "github.com/bluesky-social/indigo/util"
19 "github.com/haileyok/cocoon/blockstore"
20 "github.com/haileyok/cocoon/models"
21 blocks "github.com/ipfs/go-block-format"
22 "github.com/ipfs/go-cid"
23 cbor "github.com/ipfs/go-ipld-cbor"
24 "github.com/ipld/go-car"
25 "gorm.io/gorm"
26 "gorm.io/gorm/clause"
27)
28
29type RepoMan struct {
30 db *gorm.DB
31 s *Server
32 clock *syntax.TIDClock
33}
34
35func NewRepoMan(s *Server) *RepoMan {
36 clock := syntax.NewTIDClock(0)
37
38 return &RepoMan{
39 s: s,
40 db: s.db,
41 clock: &clock,
42 }
43}
44
45type OpType string
46
47var (
48 OpTypeCreate = OpType("com.atproto.repo.applyWrites#create")
49 OpTypeUpdate = OpType("com.atproto.repo.applyWrites#update")
50 OpTypeDelete = OpType("com.atproto.repo.applyWrites#delete")
51)
52
53func (ot OpType) String() string {
54 return string(ot)
55}
56
57type Op struct {
58 Type OpType `json:"$type"`
59 Collection string `json:"collection"`
60 Rkey *string `json:"rkey,omitempty"`
61 Validate *bool `json:"validate,omitempty"`
62 SwapRecord *string `json:"swapRecord,omitempty"`
63 Record *MarshalableMap `json:"record,omitempty"`
64}
65
66type MarshalableMap map[string]any
67
68type FirehoseOp struct {
69 Cid cid.Cid
70 Path string
71 Action string
72}
73
74func (mm *MarshalableMap) MarshalCBOR(w io.Writer) error {
75 data, err := data.MarshalCBOR(*mm)
76 if err != nil {
77 return err
78 }
79
80 w.Write(data)
81
82 return nil
83}
84
85type ApplyWriteResult struct {
86 Type *string `json:"$type,omitempty"`
87 Uri string `json:"uri"`
88 Cid string `json:"cid"`
89 Commit *RepoCommit `json:"commit,omitempty"`
90 ValidationStatus *string `json:"validationStatus"`
91}
92
93type RepoCommit struct {
94 Cid string `json:"cid"`
95 Rev string `json:"rev"`
96}
97
98// TODO make use of swap commit
99func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) {
100 rootcid, err := cid.Cast(urepo.Root)
101 if err != nil {
102 return nil, err
103 }
104
105 dbs := blockstore.New(urepo.Did, rm.db)
106 r, err := repo.OpenRepo(context.TODO(), dbs, rootcid)
107
108 entries := []models.Record{}
109 var results []ApplyWriteResult
110
111 for i, op := range writes {
112 if op.Type != OpTypeCreate && op.Rkey == nil {
113 return nil, fmt.Errorf("invalid rkey")
114 } else if op.Rkey == nil {
115 op.Rkey = to.StringPtr(rm.clock.Next().String())
116 writes[i].Rkey = op.Rkey
117 }
118
119 _, err := syntax.ParseRecordKey(*op.Rkey)
120 if err != nil {
121 return nil, err
122 }
123
124 switch op.Type {
125 case OpTypeCreate:
126 nc, err := r.PutRecord(context.TODO(), op.Collection+"/"+*op.Rkey, op.Record)
127 if err != nil {
128 return nil, err
129 }
130
131 d, _ := data.MarshalCBOR(*op.Record)
132 entries = append(entries, models.Record{
133 Did: urepo.Did,
134 CreatedAt: rm.clock.Next().String(),
135 Nsid: op.Collection,
136 Rkey: *op.Rkey,
137 Cid: nc.String(),
138 Value: d,
139 })
140 results = append(results, ApplyWriteResult{
141 Type: to.StringPtr(OpTypeCreate.String()),
142 Uri: "at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey,
143 Cid: nc.String(),
144 ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol
145 })
146 case OpTypeDelete:
147 err := r.DeleteRecord(context.TODO(), op.Collection+"/"+*op.Rkey)
148 if err != nil {
149 return nil, err
150 }
151 case OpTypeUpdate:
152 nc, err := r.UpdateRecord(context.TODO(), op.Collection+"/"+*op.Rkey, op.Record)
153 if err != nil {
154 return nil, err
155 }
156
157 d, _ := data.MarshalCBOR(*op.Record)
158 entries = append(entries, models.Record{
159 Did: urepo.Did,
160 CreatedAt: rm.clock.Next().String(),
161 Nsid: op.Collection,
162 Rkey: *op.Rkey,
163 Cid: nc.String(),
164 Value: d,
165 })
166 results = append(results, ApplyWriteResult{
167 Type: to.StringPtr(OpTypeUpdate.String()),
168 Uri: "at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey,
169 Cid: nc.String(),
170 ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol
171 })
172 }
173 }
174
175 newroot, rev, err := r.Commit(context.TODO(), urepo.SignFor)
176 if err != nil {
177 return nil, err
178 }
179
180 buf := new(bytes.Buffer)
181
182 hb, err := cbor.DumpObject(&car.CarHeader{
183 Roots: []cid.Cid{newroot},
184 Version: 1,
185 })
186
187 if _, err := carstore.LdWrite(buf, hb); err != nil {
188 return nil, err
189 }
190
191 diffops, err := r.DiffSince(context.TODO(), rootcid)
192 if err != nil {
193 return nil, err
194 }
195
196 ops := make([]*atproto.SyncSubscribeRepos_RepoOp, 0, len(diffops))
197
198 for _, op := range diffops {
199 switch op.Op {
200 case "add", "mut":
201 kind := "create"
202 if op.Op == "mut" {
203 kind = "update"
204 }
205
206 ll := lexutil.LexLink(op.NewCid)
207 ops = append(ops, &atproto.SyncSubscribeRepos_RepoOp{
208 Action: kind,
209 Path: op.Rpath,
210 Cid: &ll,
211 })
212
213 case "del":
214 ops = append(ops, &atproto.SyncSubscribeRepos_RepoOp{
215 Action: "delete",
216 Path: op.Rpath,
217 Cid: nil,
218 })
219 }
220
221 blk, err := dbs.Get(context.TODO(), op.NewCid)
222 if err != nil {
223 return nil, err
224 }
225
226 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil {
227 return nil, err
228 }
229 }
230
231 for _, op := range dbs.GetLog() {
232 if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil {
233 return nil, err
234 }
235 }
236
237 var blobs []lexutil.LexLink
238 for _, entry := range entries {
239 if err := rm.s.db.Clauses(clause.OnConflict{
240 Columns: []clause.Column{{Name: "did"}, {Name: "nsid"}, {Name: "rkey"}},
241 UpdateAll: true,
242 }).Create(&entry).Error; err != nil {
243 return nil, err
244 }
245
246 // we should actually check the type (i.e. delete, create,., update) here but we'll do it later
247 cids, err := rm.incrementBlobRefs(urepo, entry.Value)
248 if err != nil {
249 return nil, err
250 }
251
252 for _, c := range cids {
253 blobs = append(blobs, lexutil.LexLink(c))
254 }
255 }
256
257 rm.s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
258 RepoCommit: &atproto.SyncSubscribeRepos_Commit{
259 Repo: urepo.Did,
260 Blocks: buf.Bytes(),
261 Blobs: blobs,
262 Rev: rev,
263 Since: &urepo.Rev,
264 Commit: lexutil.LexLink(newroot),
265 Time: time.Now().Format(util.ISO8601),
266 Ops: ops,
267 TooBig: false,
268 },
269 })
270
271 if err := dbs.UpdateRepo(context.TODO(), newroot, rev); err != nil {
272 return nil, err
273 }
274
275 for i := range results {
276 results[i].Type = to.StringPtr(*results[i].Type + "Result")
277 results[i].Commit = &RepoCommit{
278 Cid: newroot.String(),
279 Rev: rev,
280 }
281 }
282
283 return results, nil
284}
285
286func (rm *RepoMan) getRecordProof(urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) {
287 c, err := cid.Cast(urepo.Root)
288 if err != nil {
289 return cid.Undef, nil, err
290 }
291
292 dbs := blockstore.New(urepo.Did, rm.db)
293 bs := util.NewLoggingBstore(dbs)
294
295 r, err := repo.OpenRepo(context.TODO(), bs, c)
296 if err != nil {
297 return cid.Undef, nil, err
298 }
299
300 _, _, err = r.GetRecordBytes(context.TODO(), collection+"/"+rkey)
301 if err != nil {
302 return cid.Undef, nil, err
303 }
304
305 return c, bs.GetLoggedBlocks(), nil
306}
307
308func (rm *RepoMan) incrementBlobRefs(urepo models.Repo, cbor []byte) ([]cid.Cid, error) {
309 cids, err := getBlobCidsFromCbor(cbor)
310 if err != nil {
311 return nil, err
312 }
313
314 for _, c := range cids {
315 if err := rm.db.Exec("UPDATE blobs SET ref_count = ref_count + 1 WHERE did = ? AND cid = ?", urepo.Did, c.Bytes()).Error; err != nil {
316 return nil, err
317 }
318 }
319
320 return cids, nil
321}
322
323// to be honest, we could just store both the cbor and non-cbor in []entries above to avoid an additional
324// unmarshal here. this will work for now though
325func getBlobCidsFromCbor(cbor []byte) ([]cid.Cid, error) {
326 var cids []cid.Cid
327
328 decoded, err := data.UnmarshalCBOR(cbor)
329 if err != nil {
330 return nil, fmt.Errorf("error unmarshaling cbor: %w", err)
331 }
332
333 var deepiter func(interface{}) error
334 deepiter = func(item interface{}) error {
335 switch val := item.(type) {
336 case map[string]interface{}:
337 if val["$type"] == "blob" {
338 if ref, ok := val["ref"].(string); ok {
339 c, err := cid.Parse(ref)
340 if err != nil {
341 return err
342 }
343 cids = append(cids, c)
344 }
345 for _, v := range val {
346 return deepiter(v)
347 }
348 }
349 case []interface{}:
350 for _, v := range val {
351 deepiter(v)
352 }
353 }
354
355 return nil
356 }
357
358 if err := deepiter(decoded); err != nil {
359 return nil, err
360 }
361
362 return cids, nil
363}