1package server
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "time"
9
10 "github.com/Azure/go-autorest/autorest/to"
11 "github.com/bluesky-social/indigo/api/atproto"
12 "github.com/bluesky-social/indigo/atproto/data"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/bluesky-social/indigo/carstore"
15 "github.com/bluesky-social/indigo/events"
16 lexutil "github.com/bluesky-social/indigo/lex/util"
17 "github.com/bluesky-social/indigo/repo"
18 "github.com/bluesky-social/indigo/util"
19 "github.com/haileyok/cocoon/blockstore"
20 "github.com/haileyok/cocoon/models"
21 blocks "github.com/ipfs/go-block-format"
22 "github.com/ipfs/go-cid"
23 cbor "github.com/ipfs/go-ipld-cbor"
24 "github.com/ipld/go-car"
25 "gorm.io/gorm"
26 "gorm.io/gorm/clause"
27)
28
29type RepoMan struct {
30 db *gorm.DB
31 s *Server
32 clock *syntax.TIDClock
33}
34
35func NewRepoMan(s *Server) *RepoMan {
36 clock := syntax.NewTIDClock(0)
37
38 return &RepoMan{
39 s: s,
40 db: s.db,
41 clock: &clock,
42 }
43}
44
45type OpType string
46
47var (
48 OpTypeCreate = OpType("com.atproto.repo.applyWrites#create")
49 OpTypeUpdate = OpType("com.atproto.repo.applyWrites#update")
50 OpTypeDelete = OpType("com.atproto.repo.applyWrites#delete")
51)
52
53func (ot OpType) String() string {
54 return ot.String()
55}
56
57type Op struct {
58 Type OpType `json:"$type"`
59 Collection string `json:"collection"`
60 Rkey *string `json:"rkey,omitempty"`
61 Validate *bool `json:"validate,omitempty"`
62 SwapRecord *string `json:"swapRecord,omitempty"`
63 Record *MarshalableMap `json:"record,omitempty"`
64}
65
66type MarshalableMap map[string]any
67
68type FirehoseOp struct {
69 Cid cid.Cid
70 Path string
71 Action string
72}
73
74func (mm *MarshalableMap) MarshalCBOR(w io.Writer) error {
75 data, err := data.MarshalCBOR(*mm)
76 if err != nil {
77 return err
78 }
79
80 w.Write(data)
81
82 return nil
83}
84
85type ApplyWriteResult struct {
86 Type string `json:"$type,omitempty"`
87 Uri string `json:"uri"`
88 Cid string `json:"cid"`
89 Commit *RepoCommit `json:"commit,omitempty"`
90 ValidationStatus *string `json:"validationStatus"`
91}
92
93type RepoCommit struct {
94 Cid string `json:"cid"`
95 Rev string `json:"rev"`
96}
97
98// TODO make use of swap commit
99func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) {
100 rootcid, err := cid.Cast(urepo.Root)
101 if err != nil {
102 return nil, err
103 }
104
105 dbs := blockstore.New(urepo.Did, rm.db)
106 r, err := repo.OpenRepo(context.TODO(), dbs, rootcid)
107
108 entries := []models.Record{}
109
110 for i, op := range writes {
111 if op.Type != OpTypeCreate && op.Rkey == nil {
112 return nil, fmt.Errorf("invalid rkey")
113 } else if op.Rkey == nil {
114 op.Rkey = to.StringPtr(rm.clock.Next().String())
115 writes[i].Rkey = op.Rkey
116 }
117
118 _, err := syntax.ParseRecordKey(*op.Rkey)
119 if err != nil {
120 return nil, err
121 }
122
123 switch op.Type {
124 case OpTypeCreate:
125 nc, err := r.PutRecord(context.TODO(), op.Collection+"/"+*op.Rkey, op.Record)
126 if err != nil {
127 return nil, err
128 }
129
130 d, _ := data.MarshalCBOR(*op.Record)
131 entries = append(entries, models.Record{
132 Did: urepo.Did,
133 CreatedAt: rm.clock.Next().String(),
134 Nsid: op.Collection,
135 Rkey: *op.Rkey,
136 Cid: nc.String(),
137 Value: d,
138 })
139 case OpTypeDelete:
140 err := r.DeleteRecord(context.TODO(), op.Collection+"/"+*op.Rkey)
141 if err != nil {
142 return nil, err
143 }
144 case OpTypeUpdate:
145 nc, err := r.UpdateRecord(context.TODO(), op.Collection+"/"+*op.Rkey, op.Record)
146 if err != nil {
147 return nil, err
148 }
149
150 d, _ := data.MarshalCBOR(*op.Record)
151 entries = append(entries, models.Record{
152 Did: urepo.Did,
153 CreatedAt: rm.clock.Next().String(),
154 Nsid: op.Collection,
155 Rkey: *op.Rkey,
156 Cid: nc.String(),
157 Value: d,
158 })
159 }
160 }
161
162 newroot, rev, err := r.Commit(context.TODO(), urepo.SignFor)
163 if err != nil {
164 return nil, err
165 }
166
167 buf := new(bytes.Buffer)
168
169 hb, err := cbor.DumpObject(&car.CarHeader{
170 Roots: []cid.Cid{newroot},
171 Version: 1,
172 })
173
174 if _, err := carstore.LdWrite(buf, hb); err != nil {
175 return nil, err
176 }
177
178 diffops, err := r.DiffSince(context.TODO(), rootcid)
179 if err != nil {
180 return nil, err
181 }
182
183 ops := make([]*atproto.SyncSubscribeRepos_RepoOp, 0, len(diffops))
184
185 for _, op := range diffops {
186 switch op.Op {
187 case "add", "mut":
188 kind := "create"
189 if op.Op == "mut" {
190 kind = "update"
191 }
192
193 ll := lexutil.LexLink(op.NewCid)
194 ops = append(ops, &atproto.SyncSubscribeRepos_RepoOp{
195 Action: kind,
196 Path: op.Rpath,
197 Cid: &ll,
198 })
199
200 case "del":
201 ops = append(ops, &atproto.SyncSubscribeRepos_RepoOp{
202 Action: "delete",
203 Path: op.Rpath,
204 Cid: nil,
205 })
206 }
207
208 blk, err := dbs.Get(context.TODO(), op.NewCid)
209 if err != nil {
210 return nil, err
211 }
212
213 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil {
214 return nil, err
215 }
216 }
217
218 for _, op := range dbs.GetLog() {
219 if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil {
220 return nil, err
221 }
222 }
223
224 var results []ApplyWriteResult
225
226 var blobs []lexutil.LexLink
227 for _, entry := range entries {
228 if err := rm.s.db.Clauses(clause.OnConflict{
229 Columns: []clause.Column{{Name: "did"}, {Name: "nsid"}, {Name: "rkey"}},
230 UpdateAll: true,
231 }).Create(&entry).Error; err != nil {
232 return nil, err
233 }
234
235 // we should actually check the type (i.e. delete, create,., update) here but we'll do it later
236 cids, err := rm.incrementBlobRefs(urepo, entry.Value)
237 if err != nil {
238 return nil, err
239 }
240
241 for _, c := range cids {
242 blobs = append(blobs, lexutil.LexLink(c))
243 }
244
245 results = append(results, ApplyWriteResult{
246 Uri: "at://" + urepo.Did + "/" + entry.Nsid + "/" + entry.Rkey,
247 Cid: entry.Cid,
248 Commit: &RepoCommit{
249 Cid: newroot.String(),
250 Rev: rev,
251 },
252 ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol
253 })
254 }
255
256 rm.s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
257 RepoCommit: &atproto.SyncSubscribeRepos_Commit{
258 Repo: urepo.Did,
259 Blocks: buf.Bytes(),
260 Blobs: blobs,
261 Rev: rev,
262 Since: &urepo.Rev,
263 Commit: lexutil.LexLink(newroot),
264 Time: time.Now().Format(util.ISO8601),
265 Ops: ops,
266 TooBig: false,
267 },
268 })
269
270 if err := dbs.UpdateRepo(context.TODO(), newroot, rev); err != nil {
271 return nil, err
272 }
273
274 return results, nil
275}
276
277func (rm *RepoMan) getRecordProof(urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) {
278 c, err := cid.Cast(urepo.Root)
279 if err != nil {
280 return cid.Undef, nil, err
281 }
282
283 dbs := blockstore.New(urepo.Did, rm.db)
284 bs := util.NewLoggingBstore(dbs)
285
286 r, err := repo.OpenRepo(context.TODO(), bs, c)
287 if err != nil {
288 return cid.Undef, nil, err
289 }
290
291 _, _, err = r.GetRecordBytes(context.TODO(), collection+"/"+rkey)
292 if err != nil {
293 return cid.Undef, nil, err
294 }
295
296 return c, bs.GetLoggedBlocks(), nil
297}
298
299func (rm *RepoMan) incrementBlobRefs(urepo models.Repo, cbor []byte) ([]cid.Cid, error) {
300 cids, err := getBlobCidsFromCbor(cbor)
301 if err != nil {
302 return nil, err
303 }
304
305 for _, c := range cids {
306 if err := rm.db.Exec("UPDATE blobs SET ref_count = ref_count + 1 WHERE did = ? AND cid = ?", urepo.Did, c.Bytes()).Error; err != nil {
307 return nil, err
308 }
309 }
310
311 return cids, nil
312}
313
314// to be honest, we could just store both the cbor and non-cbor in []entries above to avoid an additional
315// unmarshal here. this will work for now though
316func getBlobCidsFromCbor(cbor []byte) ([]cid.Cid, error) {
317 var cids []cid.Cid
318
319 decoded, err := data.UnmarshalCBOR(cbor)
320 if err != nil {
321 return nil, fmt.Errorf("error unmarshaling cbor: %w", err)
322 }
323
324 var deepiter func(interface{}) error
325 deepiter = func(item interface{}) error {
326 switch val := item.(type) {
327 case map[string]interface{}:
328 if val["$type"] == "blob" {
329 if ref, ok := val["ref"].(string); ok {
330 c, err := cid.Parse(ref)
331 if err != nil {
332 return err
333 }
334 cids = append(cids, c)
335 }
336 for _, v := range val {
337 return deepiter(v)
338 }
339 }
340 case []interface{}:
341 for _, v := range val {
342 deepiter(v)
343 }
344 }
345
346 return nil
347 }
348
349 if err := deepiter(decoded); err != nil {
350 return nil, err
351 }
352
353 return cids, nil
354}