An atproto PDS written in Go
1package server 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "time" 9 10 "github.com/Azure/go-autorest/autorest/to" 11 "github.com/bluesky-social/indigo/api/atproto" 12 "github.com/bluesky-social/indigo/atproto/data" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/bluesky-social/indigo/carstore" 15 "github.com/bluesky-social/indigo/events" 16 lexutil "github.com/bluesky-social/indigo/lex/util" 17 "github.com/bluesky-social/indigo/repo" 18 "github.com/bluesky-social/indigo/util" 19 "github.com/haileyok/cocoon/blockstore" 20 "github.com/haileyok/cocoon/models" 21 blocks "github.com/ipfs/go-block-format" 22 "github.com/ipfs/go-cid" 23 cbor "github.com/ipfs/go-ipld-cbor" 24 "github.com/ipld/go-car" 25 "gorm.io/gorm" 26 "gorm.io/gorm/clause" 27) 28 29type RepoMan struct { 30 db *gorm.DB 31 s *Server 32 clock *syntax.TIDClock 33} 34 35func NewRepoMan(s *Server) *RepoMan { 36 clock := syntax.NewTIDClock(0) 37 38 return &RepoMan{ 39 s: s, 40 db: s.db, 41 clock: &clock, 42 } 43} 44 45type OpType string 46 47var ( 48 OpTypeCreate = OpType("com.atproto.repo.applyWrites#create") 49 OpTypeUpdate = OpType("com.atproto.repo.applyWrites#update") 50 OpTypeDelete = OpType("com.atproto.repo.applyWrites#delete") 51) 52 53func (ot OpType) String() string { 54 return ot.String() 55} 56 57type Op struct { 58 Type OpType `json:"$type"` 59 Collection string `json:"collection"` 60 Rkey *string `json:"rkey,omitempty"` 61 Validate *bool `json:"validate,omitempty"` 62 SwapRecord *string `json:"swapRecord,omitempty"` 63 Record *MarshalableMap `json:"record,omitempty"` 64} 65 66type MarshalableMap map[string]any 67 68type FirehoseOp struct { 69 Cid cid.Cid 70 Path string 71 Action string 72} 73 74func (mm *MarshalableMap) MarshalCBOR(w io.Writer) error { 75 data, err := data.MarshalCBOR(*mm) 76 if err != nil { 77 return err 78 } 79 80 w.Write(data) 81 82 return nil 83} 84 85// TODO make use of swap commit 86func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *string) error { 87 rootcid, err := cid.Cast(urepo.Root) 88 if err != nil { 89 return err 90 } 91 92 dbs := blockstore.New(urepo.Did, rm.db) 93 r, err := repo.OpenRepo(context.TODO(), dbs, rootcid) 94 95 entries := []models.Record{} 96 97 for i, op := range writes { 98 if op.Type != OpTypeCreate && op.Rkey == nil { 99 return fmt.Errorf("invalid rkey") 100 } else if op.Rkey == nil { 101 op.Rkey = to.StringPtr(rm.clock.Next().String()) 102 writes[i].Rkey = op.Rkey 103 } 104 105 _, err := syntax.ParseRecordKey(*op.Rkey) 106 if err != nil { 107 return err 108 } 109 110 switch op.Type { 111 case OpTypeCreate: 112 nc, err := r.PutRecord(context.TODO(), op.Collection+"/"+*op.Rkey, op.Record) 113 if err != nil { 114 return err 115 } 116 117 d, _ := data.MarshalCBOR(*op.Record) 118 entries = append(entries, models.Record{ 119 Did: urepo.Did, 120 CreatedAt: rm.clock.Next().String(), 121 Nsid: op.Collection, 122 Rkey: *op.Rkey, 123 Cid: nc.String(), 124 Value: d, 125 }) 126 case OpTypeDelete: 127 err := r.DeleteRecord(context.TODO(), op.Collection+"/"+*op.Rkey) 128 if err != nil { 129 return err 130 } 131 case OpTypeUpdate: 132 nc, err := r.UpdateRecord(context.TODO(), op.Collection+"/"+*op.Rkey, op.Record) 133 if err != nil { 134 return err 135 } 136 137 d, _ := data.MarshalCBOR(*op.Record) 138 entries = append(entries, models.Record{ 139 Did: urepo.Did, 140 CreatedAt: rm.clock.Next().String(), 141 Nsid: op.Collection, 142 Rkey: *op.Rkey, 143 Cid: nc.String(), 144 Value: d, 145 }) 146 } 147 } 148 149 newroot, rev, err := r.Commit(context.TODO(), urepo.SignFor) 150 if err != nil { 151 return err 152 } 153 154 buf := new(bytes.Buffer) 155 156 hb, err := cbor.DumpObject(&car.CarHeader{ 157 Roots: []cid.Cid{newroot}, 158 Version: 1, 159 }) 160 161 if _, err := carstore.LdWrite(buf, hb); err != nil { 162 return err 163 } 164 165 diffops, err := r.DiffSince(context.TODO(), rootcid) 166 if err != nil { 167 return err 168 } 169 170 ops := make([]*atproto.SyncSubscribeRepos_RepoOp, 0, len(diffops)) 171 172 for _, op := range diffops { 173 switch op.Op { 174 case "add", "mut": 175 kind := "create" 176 if op.Op == "mut" { 177 kind = "update" 178 } 179 180 ll := lexutil.LexLink(op.NewCid) 181 ops = append(ops, &atproto.SyncSubscribeRepos_RepoOp{ 182 Action: kind, 183 Path: op.Rpath, 184 Cid: &ll, 185 }) 186 187 case "del": 188 ops = append(ops, &atproto.SyncSubscribeRepos_RepoOp{ 189 Action: "delete", 190 Path: op.Rpath, 191 Cid: nil, 192 }) 193 } 194 195 blk, err := dbs.Get(context.TODO(), op.NewCid) 196 if err != nil { 197 return err 198 } 199 200 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil { 201 return err 202 } 203 } 204 205 for _, op := range dbs.GetLog() { 206 if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil { 207 return err 208 } 209 } 210 211 var blobs []lexutil.LexLink 212 for _, entry := range entries { 213 if err := rm.s.db.Clauses(clause.OnConflict{ 214 Columns: []clause.Column{{Name: "did"}, {Name: "nsid"}, {Name: "rkey"}}, 215 UpdateAll: true, 216 }).Create(&entry).Error; err != nil { 217 return err 218 } 219 220 // we should actually check the type (i.e. delete, create,., update) here but we'll do it later 221 cids, err := rm.incrementBlobRefs(urepo, entry.Value) 222 if err != nil { 223 return err 224 } 225 226 for _, c := range cids { 227 blobs = append(blobs, lexutil.LexLink(c)) 228 } 229 } 230 231 rm.s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{ 232 RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 233 Repo: urepo.Did, 234 Blocks: buf.Bytes(), 235 Blobs: blobs, 236 Rev: rev, 237 Since: &urepo.Rev, 238 Commit: lexutil.LexLink(newroot), 239 Time: time.Now().Format(util.ISO8601), 240 Ops: ops, 241 TooBig: false, 242 }, 243 }) 244 245 if err := dbs.UpdateRepo(context.TODO(), newroot, rev); err != nil { 246 return err 247 } 248 249 return nil 250} 251 252func (rm *RepoMan) getRecordProof(urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) { 253 c, err := cid.Cast(urepo.Root) 254 if err != nil { 255 return cid.Undef, nil, err 256 } 257 258 dbs := blockstore.New(urepo.Did, rm.db) 259 bs := util.NewLoggingBstore(dbs) 260 261 r, err := repo.OpenRepo(context.TODO(), bs, c) 262 if err != nil { 263 return cid.Undef, nil, err 264 } 265 266 _, _, err = r.GetRecordBytes(context.TODO(), collection+"/"+rkey) 267 if err != nil { 268 return cid.Undef, nil, err 269 } 270 271 return c, bs.GetLoggedBlocks(), nil 272} 273 274func (rm *RepoMan) incrementBlobRefs(urepo models.Repo, cbor []byte) ([]cid.Cid, error) { 275 cids, err := getBlobCidsFromCbor(cbor) 276 if err != nil { 277 return nil, err 278 } 279 280 for _, c := range cids { 281 if err := rm.db.Exec("UPDATE blobs SET ref_count = ref_count + 1 WHERE did = ? AND cid = ?", urepo.Did, c.Bytes()).Error; err != nil { 282 return nil, err 283 } 284 } 285 286 return cids, nil 287} 288 289// to be honest, we could just store both the cbor and non-cbor in []entries above to avoid an additional 290// unmarshal here. this will work for now though 291func getBlobCidsFromCbor(cbor []byte) ([]cid.Cid, error) { 292 var cids []cid.Cid 293 294 decoded, err := data.UnmarshalCBOR(cbor) 295 if err != nil { 296 return nil, fmt.Errorf("error unmarshaling cbor: %w", err) 297 } 298 299 var deepiter func(interface{}) error 300 deepiter = func(item interface{}) error { 301 switch val := item.(type) { 302 case map[string]interface{}: 303 if val["$type"] == "blob" { 304 if ref, ok := val["ref"].(string); ok { 305 c, err := cid.Parse(ref) 306 if err != nil { 307 return err 308 } 309 cids = append(cids, c) 310 } 311 for _, v := range val { 312 return deepiter(v) 313 } 314 } 315 case []interface{}: 316 for _, v := range val { 317 deepiter(v) 318 } 319 } 320 321 return nil 322 } 323 324 if err := deepiter(decoded); err != nil { 325 return nil, err 326 } 327 328 return cids, nil 329}