An atproto PDS written in Go
1package server 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "time" 9 10 "github.com/Azure/go-autorest/autorest/to" 11 "github.com/bluesky-social/indigo/api/atproto" 12 "github.com/bluesky-social/indigo/atproto/data" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/bluesky-social/indigo/carstore" 15 "github.com/bluesky-social/indigo/events" 16 lexutil "github.com/bluesky-social/indigo/lex/util" 17 "github.com/bluesky-social/indigo/repo" 18 "github.com/bluesky-social/indigo/util" 19 "github.com/haileyok/cocoon/blockstore" 20 "github.com/haileyok/cocoon/models" 21 blocks "github.com/ipfs/go-block-format" 22 "github.com/ipfs/go-cid" 23 cbor "github.com/ipfs/go-ipld-cbor" 24 "github.com/ipld/go-car" 25 "gorm.io/gorm" 26 "gorm.io/gorm/clause" 27) 28 29type RepoMan struct { 30 db *gorm.DB 31 s *Server 32 clock *syntax.TIDClock 33} 34 35func NewRepoMan(s *Server) *RepoMan { 36 clock := syntax.NewTIDClock(0) 37 38 return &RepoMan{ 39 s: s, 40 db: s.db, 41 clock: &clock, 42 } 43} 44 45type OpType string 46 47var ( 48 OpTypeCreate = OpType("com.atproto.repo.applyWrites#create") 49 OpTypeUpdate = OpType("com.atproto.repo.applyWrites#update") 50 OpTypeDelete = OpType("com.atproto.repo.applyWrites#delete") 51) 52 53func (ot OpType) String() string { 54 return ot.String() 55} 56 57type Op struct { 58 Type OpType `json:"$type"` 59 Collection string `json:"collection"` 60 Rkey *string `json:"rkey,omitempty"` 61 Validate *bool `json:"validate,omitempty"` 62 SwapRecord *string `json:"swapRecord,omitempty"` 63 Record *MarshalableMap `json:"record,omitempty"` 64} 65 66type MarshalableMap map[string]any 67 68type FirehoseOp struct { 69 Cid cid.Cid 70 Path string 71 Action string 72} 73 74func (mm *MarshalableMap) MarshalCBOR(w io.Writer) error { 75 data, err := data.MarshalCBOR(*mm) 76 if err != nil { 77 return err 78 } 79 80 w.Write(data) 81 82 return nil 83} 84 85type ApplyWriteResult struct { 86 Uri string `json:"uri"` 87 Cid string `json:"cid"` 88 Commit *RepoCommit `json:"commit"` 89 ValidationStatus *string `json:"validationStatus"` 90} 91 92type RepoCommit struct { 93 Cid string `json:"cid"` 94 Rev string `json:"rev"` 95} 96 97// TODO make use of swap commit 98func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) { 99 rootcid, err := cid.Cast(urepo.Root) 100 if err != nil { 101 return nil, err 102 } 103 104 dbs := blockstore.New(urepo.Did, rm.db) 105 r, err := repo.OpenRepo(context.TODO(), dbs, rootcid) 106 107 entries := []models.Record{} 108 109 for i, op := range writes { 110 if op.Type != OpTypeCreate && op.Rkey == nil { 111 return nil, fmt.Errorf("invalid rkey") 112 } else if op.Rkey == nil { 113 op.Rkey = to.StringPtr(rm.clock.Next().String()) 114 writes[i].Rkey = op.Rkey 115 } 116 117 _, err := syntax.ParseRecordKey(*op.Rkey) 118 if err != nil { 119 return nil, err 120 } 121 122 switch op.Type { 123 case OpTypeCreate: 124 nc, err := r.PutRecord(context.TODO(), op.Collection+"/"+*op.Rkey, op.Record) 125 if err != nil { 126 return nil, err 127 } 128 129 d, _ := data.MarshalCBOR(*op.Record) 130 entries = append(entries, models.Record{ 131 Did: urepo.Did, 132 CreatedAt: rm.clock.Next().String(), 133 Nsid: op.Collection, 134 Rkey: *op.Rkey, 135 Cid: nc.String(), 136 Value: d, 137 }) 138 case OpTypeDelete: 139 err := r.DeleteRecord(context.TODO(), op.Collection+"/"+*op.Rkey) 140 if err != nil { 141 return nil, err 142 } 143 case OpTypeUpdate: 144 nc, err := r.UpdateRecord(context.TODO(), op.Collection+"/"+*op.Rkey, op.Record) 145 if err != nil { 146 return nil, err 147 } 148 149 d, _ := data.MarshalCBOR(*op.Record) 150 entries = append(entries, models.Record{ 151 Did: urepo.Did, 152 CreatedAt: rm.clock.Next().String(), 153 Nsid: op.Collection, 154 Rkey: *op.Rkey, 155 Cid: nc.String(), 156 Value: d, 157 }) 158 } 159 } 160 161 newroot, rev, err := r.Commit(context.TODO(), urepo.SignFor) 162 if err != nil { 163 return nil, err 164 } 165 166 buf := new(bytes.Buffer) 167 168 hb, err := cbor.DumpObject(&car.CarHeader{ 169 Roots: []cid.Cid{newroot}, 170 Version: 1, 171 }) 172 173 if _, err := carstore.LdWrite(buf, hb); err != nil { 174 return nil, err 175 } 176 177 diffops, err := r.DiffSince(context.TODO(), rootcid) 178 if err != nil { 179 return nil, err 180 } 181 182 ops := make([]*atproto.SyncSubscribeRepos_RepoOp, 0, len(diffops)) 183 184 for _, op := range diffops { 185 switch op.Op { 186 case "add", "mut": 187 kind := "create" 188 if op.Op == "mut" { 189 kind = "update" 190 } 191 192 ll := lexutil.LexLink(op.NewCid) 193 ops = append(ops, &atproto.SyncSubscribeRepos_RepoOp{ 194 Action: kind, 195 Path: op.Rpath, 196 Cid: &ll, 197 }) 198 199 case "del": 200 ops = append(ops, &atproto.SyncSubscribeRepos_RepoOp{ 201 Action: "delete", 202 Path: op.Rpath, 203 Cid: nil, 204 }) 205 } 206 207 blk, err := dbs.Get(context.TODO(), op.NewCid) 208 if err != nil { 209 return nil, err 210 } 211 212 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil { 213 return nil, err 214 } 215 } 216 217 for _, op := range dbs.GetLog() { 218 if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil { 219 return nil, err 220 } 221 } 222 223 var results []ApplyWriteResult 224 225 var blobs []lexutil.LexLink 226 for _, entry := range entries { 227 if err := rm.s.db.Clauses(clause.OnConflict{ 228 Columns: []clause.Column{{Name: "did"}, {Name: "nsid"}, {Name: "rkey"}}, 229 UpdateAll: true, 230 }).Create(&entry).Error; err != nil { 231 return nil, err 232 } 233 234 // we should actually check the type (i.e. delete, create,., update) here but we'll do it later 235 cids, err := rm.incrementBlobRefs(urepo, entry.Value) 236 if err != nil { 237 return nil, err 238 } 239 240 for _, c := range cids { 241 blobs = append(blobs, lexutil.LexLink(c)) 242 } 243 244 results = append(results, ApplyWriteResult{ 245 Uri: "at://" + urepo.Did + "/" + entry.Nsid + "/" + entry.Rkey, 246 Cid: entry.Cid, 247 Commit: &RepoCommit{ 248 Cid: newroot.String(), 249 Rev: rev, 250 }, 251 ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol 252 }) 253 } 254 255 rm.s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{ 256 RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 257 Repo: urepo.Did, 258 Blocks: buf.Bytes(), 259 Blobs: blobs, 260 Rev: rev, 261 Since: &urepo.Rev, 262 Commit: lexutil.LexLink(newroot), 263 Time: time.Now().Format(util.ISO8601), 264 Ops: ops, 265 TooBig: false, 266 }, 267 }) 268 269 if err := dbs.UpdateRepo(context.TODO(), newroot, rev); err != nil { 270 return nil, err 271 } 272 273 return results, nil 274} 275 276func (rm *RepoMan) getRecordProof(urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) { 277 c, err := cid.Cast(urepo.Root) 278 if err != nil { 279 return cid.Undef, nil, err 280 } 281 282 dbs := blockstore.New(urepo.Did, rm.db) 283 bs := util.NewLoggingBstore(dbs) 284 285 r, err := repo.OpenRepo(context.TODO(), bs, c) 286 if err != nil { 287 return cid.Undef, nil, err 288 } 289 290 _, _, err = r.GetRecordBytes(context.TODO(), collection+"/"+rkey) 291 if err != nil { 292 return cid.Undef, nil, err 293 } 294 295 return c, bs.GetLoggedBlocks(), nil 296} 297 298func (rm *RepoMan) incrementBlobRefs(urepo models.Repo, cbor []byte) ([]cid.Cid, error) { 299 cids, err := getBlobCidsFromCbor(cbor) 300 if err != nil { 301 return nil, err 302 } 303 304 for _, c := range cids { 305 if err := rm.db.Exec("UPDATE blobs SET ref_count = ref_count + 1 WHERE did = ? AND cid = ?", urepo.Did, c.Bytes()).Error; err != nil { 306 return nil, err 307 } 308 } 309 310 return cids, nil 311} 312 313// to be honest, we could just store both the cbor and non-cbor in []entries above to avoid an additional 314// unmarshal here. this will work for now though 315func getBlobCidsFromCbor(cbor []byte) ([]cid.Cid, error) { 316 var cids []cid.Cid 317 318 decoded, err := data.UnmarshalCBOR(cbor) 319 if err != nil { 320 return nil, fmt.Errorf("error unmarshaling cbor: %w", err) 321 } 322 323 var deepiter func(interface{}) error 324 deepiter = func(item interface{}) error { 325 switch val := item.(type) { 326 case map[string]interface{}: 327 if val["$type"] == "blob" { 328 if ref, ok := val["ref"].(string); ok { 329 c, err := cid.Parse(ref) 330 if err != nil { 331 return err 332 } 333 cids = append(cids, c) 334 } 335 for _, v := range val { 336 return deepiter(v) 337 } 338 } 339 case []interface{}: 340 for _, v := range val { 341 deepiter(v) 342 } 343 } 344 345 return nil 346 } 347 348 if err := deepiter(decoded); err != nil { 349 return nil, err 350 } 351 352 return cids, nil 353}