An atproto PDS written in Go
1package server 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "time" 9 10 "github.com/Azure/go-autorest/autorest/to" 11 "github.com/bluesky-social/indigo/api/atproto" 12 "github.com/bluesky-social/indigo/atproto/data" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/bluesky-social/indigo/carstore" 15 "github.com/bluesky-social/indigo/events" 16 lexutil "github.com/bluesky-social/indigo/lex/util" 17 "github.com/bluesky-social/indigo/repo" 18 "github.com/bluesky-social/indigo/util" 19 "github.com/haileyok/cocoon/blockstore" 20 "github.com/haileyok/cocoon/models" 21 blocks "github.com/ipfs/go-block-format" 22 "github.com/ipfs/go-cid" 23 cbor "github.com/ipfs/go-ipld-cbor" 24 "github.com/ipld/go-car" 25 "gorm.io/gorm" 26 "gorm.io/gorm/clause" 27) 28 29type RepoMan struct { 30 db *gorm.DB 31 s *Server 32 clock *syntax.TIDClock 33} 34 35func NewRepoMan(s *Server) *RepoMan { 36 clock := syntax.NewTIDClock(0) 37 38 return &RepoMan{ 39 s: s, 40 db: s.db, 41 clock: &clock, 42 } 43} 44 45type OpType string 46 47var ( 48 OpTypeCreate = OpType("com.atproto.repo.applyWrites#create") 49 OpTypeUpdate = OpType("com.atproto.repo.applyWrites#update") 50 OpTypeDelete = OpType("com.atproto.repo.applyWrites#delete") 51) 52 53func (ot OpType) String() string { 54 return ot.String() 55} 56 57type Op struct { 58 Type OpType `json:"$type"` 59 Collection string `json:"collection"` 60 Rkey *string `json:"rkey,omitempty"` 61 Validate *bool `json:"validate,omitempty"` 62 SwapRecord *string `json:"swapRecord,omitempty"` 63 Record *MarshalableMap `json:"record,omitempty"` 64} 65 66type MarshalableMap map[string]any 67 68type FirehoseOp struct { 69 Cid cid.Cid 70 Path string 71 Action string 72} 73 74func (mm *MarshalableMap) MarshalCBOR(w io.Writer) error { 75 data, err := data.MarshalCBOR(*mm) 76 if err != nil { 77 return err 78 } 79 80 w.Write(data) 81 82 return nil 83} 84 85type ApplyWriteResult struct { 86 Type string `json:"$type,omitempty"` 87 Uri string `json:"uri"` 88 Cid string `json:"cid"` 89 Commit *RepoCommit `json:"commit,omitempty"` 90 ValidationStatus *string `json:"validationStatus"` 91} 92 93type RepoCommit struct { 94 Cid string `json:"cid"` 95 Rev string `json:"rev"` 96} 97 98// TODO make use of swap commit 99func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) { 100 rootcid, err := cid.Cast(urepo.Root) 101 if err != nil { 102 return nil, err 103 } 104 105 dbs := blockstore.New(urepo.Did, rm.db) 106 r, err := repo.OpenRepo(context.TODO(), dbs, rootcid) 107 108 entries := []models.Record{} 109 110 for i, op := range writes { 111 if op.Type != OpTypeCreate && op.Rkey == nil { 112 return nil, fmt.Errorf("invalid rkey") 113 } else if op.Rkey == nil { 114 op.Rkey = to.StringPtr(rm.clock.Next().String()) 115 writes[i].Rkey = op.Rkey 116 } 117 118 _, err := syntax.ParseRecordKey(*op.Rkey) 119 if err != nil { 120 return nil, err 121 } 122 123 switch op.Type { 124 case OpTypeCreate: 125 nc, err := r.PutRecord(context.TODO(), op.Collection+"/"+*op.Rkey, op.Record) 126 if err != nil { 127 return nil, err 128 } 129 130 d, _ := data.MarshalCBOR(*op.Record) 131 entries = append(entries, models.Record{ 132 Did: urepo.Did, 133 CreatedAt: rm.clock.Next().String(), 134 Nsid: op.Collection, 135 Rkey: *op.Rkey, 136 Cid: nc.String(), 137 Value: d, 138 }) 139 case OpTypeDelete: 140 err := r.DeleteRecord(context.TODO(), op.Collection+"/"+*op.Rkey) 141 if err != nil { 142 return nil, err 143 } 144 case OpTypeUpdate: 145 nc, err := r.UpdateRecord(context.TODO(), op.Collection+"/"+*op.Rkey, op.Record) 146 if err != nil { 147 return nil, err 148 } 149 150 d, _ := data.MarshalCBOR(*op.Record) 151 entries = append(entries, models.Record{ 152 Did: urepo.Did, 153 CreatedAt: rm.clock.Next().String(), 154 Nsid: op.Collection, 155 Rkey: *op.Rkey, 156 Cid: nc.String(), 157 Value: d, 158 }) 159 } 160 } 161 162 newroot, rev, err := r.Commit(context.TODO(), urepo.SignFor) 163 if err != nil { 164 return nil, err 165 } 166 167 buf := new(bytes.Buffer) 168 169 hb, err := cbor.DumpObject(&car.CarHeader{ 170 Roots: []cid.Cid{newroot}, 171 Version: 1, 172 }) 173 174 if _, err := carstore.LdWrite(buf, hb); err != nil { 175 return nil, err 176 } 177 178 diffops, err := r.DiffSince(context.TODO(), rootcid) 179 if err != nil { 180 return nil, err 181 } 182 183 ops := make([]*atproto.SyncSubscribeRepos_RepoOp, 0, len(diffops)) 184 185 for _, op := range diffops { 186 switch op.Op { 187 case "add", "mut": 188 kind := "create" 189 if op.Op == "mut" { 190 kind = "update" 191 } 192 193 ll := lexutil.LexLink(op.NewCid) 194 ops = append(ops, &atproto.SyncSubscribeRepos_RepoOp{ 195 Action: kind, 196 Path: op.Rpath, 197 Cid: &ll, 198 }) 199 200 case "del": 201 ops = append(ops, &atproto.SyncSubscribeRepos_RepoOp{ 202 Action: "delete", 203 Path: op.Rpath, 204 Cid: nil, 205 }) 206 } 207 208 blk, err := dbs.Get(context.TODO(), op.NewCid) 209 if err != nil { 210 return nil, err 211 } 212 213 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil { 214 return nil, err 215 } 216 } 217 218 for _, op := range dbs.GetLog() { 219 if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil { 220 return nil, err 221 } 222 } 223 224 var results []ApplyWriteResult 225 226 var blobs []lexutil.LexLink 227 for _, entry := range entries { 228 if err := rm.s.db.Clauses(clause.OnConflict{ 229 Columns: []clause.Column{{Name: "did"}, {Name: "nsid"}, {Name: "rkey"}}, 230 UpdateAll: true, 231 }).Create(&entry).Error; err != nil { 232 return nil, err 233 } 234 235 // we should actually check the type (i.e. delete, create,., update) here but we'll do it later 236 cids, err := rm.incrementBlobRefs(urepo, entry.Value) 237 if err != nil { 238 return nil, err 239 } 240 241 for _, c := range cids { 242 blobs = append(blobs, lexutil.LexLink(c)) 243 } 244 245 results = append(results, ApplyWriteResult{ 246 Uri: "at://" + urepo.Did + "/" + entry.Nsid + "/" + entry.Rkey, 247 Cid: entry.Cid, 248 Commit: &RepoCommit{ 249 Cid: newroot.String(), 250 Rev: rev, 251 }, 252 ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol 253 }) 254 } 255 256 rm.s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{ 257 RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 258 Repo: urepo.Did, 259 Blocks: buf.Bytes(), 260 Blobs: blobs, 261 Rev: rev, 262 Since: &urepo.Rev, 263 Commit: lexutil.LexLink(newroot), 264 Time: time.Now().Format(util.ISO8601), 265 Ops: ops, 266 TooBig: false, 267 }, 268 }) 269 270 if err := dbs.UpdateRepo(context.TODO(), newroot, rev); err != nil { 271 return nil, err 272 } 273 274 return results, nil 275} 276 277func (rm *RepoMan) getRecordProof(urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) { 278 c, err := cid.Cast(urepo.Root) 279 if err != nil { 280 return cid.Undef, nil, err 281 } 282 283 dbs := blockstore.New(urepo.Did, rm.db) 284 bs := util.NewLoggingBstore(dbs) 285 286 r, err := repo.OpenRepo(context.TODO(), bs, c) 287 if err != nil { 288 return cid.Undef, nil, err 289 } 290 291 _, _, err = r.GetRecordBytes(context.TODO(), collection+"/"+rkey) 292 if err != nil { 293 return cid.Undef, nil, err 294 } 295 296 return c, bs.GetLoggedBlocks(), nil 297} 298 299func (rm *RepoMan) incrementBlobRefs(urepo models.Repo, cbor []byte) ([]cid.Cid, error) { 300 cids, err := getBlobCidsFromCbor(cbor) 301 if err != nil { 302 return nil, err 303 } 304 305 for _, c := range cids { 306 if err := rm.db.Exec("UPDATE blobs SET ref_count = ref_count + 1 WHERE did = ? AND cid = ?", urepo.Did, c.Bytes()).Error; err != nil { 307 return nil, err 308 } 309 } 310 311 return cids, nil 312} 313 314// to be honest, we could just store both the cbor and non-cbor in []entries above to avoid an additional 315// unmarshal here. this will work for now though 316func getBlobCidsFromCbor(cbor []byte) ([]cid.Cid, error) { 317 var cids []cid.Cid 318 319 decoded, err := data.UnmarshalCBOR(cbor) 320 if err != nil { 321 return nil, fmt.Errorf("error unmarshaling cbor: %w", err) 322 } 323 324 var deepiter func(interface{}) error 325 deepiter = func(item interface{}) error { 326 switch val := item.(type) { 327 case map[string]interface{}: 328 if val["$type"] == "blob" { 329 if ref, ok := val["ref"].(string); ok { 330 c, err := cid.Parse(ref) 331 if err != nil { 332 return err 333 } 334 cids = append(cids, c) 335 } 336 for _, v := range val { 337 return deepiter(v) 338 } 339 } 340 case []interface{}: 341 for _, v := range val { 342 deepiter(v) 343 } 344 } 345 346 return nil 347 } 348 349 if err := deepiter(decoded); err != nil { 350 return nil, err 351 } 352 353 return cids, nil 354}