An atproto PDS written in Go

com.atproto.repo.deleteRecord (#1)

* deleteRecord

* tweak

* fixes

Changed files
+126 -17
server
+55
server/handle_repo_delete_record.go
···
···
+
package server
+
+
import (
+
"github.com/haileyok/cocoon/internal/helpers"
+
"github.com/haileyok/cocoon/models"
+
"github.com/labstack/echo/v4"
+
)
+
+
type ComAtprotoRepoDeleteRecordRequest struct {
+
Repo string `json:"repo" validate:"required,atproto-did"`
+
Collection string `json:"collection" validate:"required,atproto-nsid"`
+
Rkey string `json:"rkey" validate:"required,atproto-rkey"`
+
SwapRecord *string `json:"swapRecord"`
+
SwapCommit *string `json:"swapCommit"`
+
}
+
+
func (s *Server) handleDeleteRecord(e echo.Context) error {
+
repo := e.Get("repo").(*models.RepoActor)
+
+
var req ComAtprotoRepoDeleteRecordRequest
+
if err := e.Bind(&req); err != nil {
+
s.logger.Error("error binding", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
if err := e.Validate(req); err != nil {
+
s.logger.Error("error validating", "error", err)
+
return helpers.InputError(e, nil)
+
}
+
+
if repo.Repo.Did != req.Repo {
+
s.logger.Warn("mismatched repo/auth")
+
return helpers.InputError(e, nil)
+
}
+
+
results, err := s.repoman.applyWrites(repo.Repo, []Op{
+
{
+
Type: OpTypeDelete,
+
Collection: req.Collection,
+
Rkey: &req.Rkey,
+
SwapRecord: req.SwapRecord,
+
},
+
}, req.SwapCommit)
+
if err != nil {
+
s.logger.Error("error applying writes", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
results[0].Type = nil
+
results[0].Uri = nil
+
results[0].Cid = nil
+
results[0].ValidationStatus = nil
+
+
return e.JSON(200, results[0])
+
}
+70 -17
server/repo.go
···
type ApplyWriteResult struct {
Type *string `json:"$type,omitempty"`
-
Uri string `json:"uri"`
-
Cid string `json:"cid"`
Commit *RepoCommit `json:"commit,omitempty"`
-
ValidationStatus *string `json:"validationStatus"`
}
type RepoCommit struct {
···
})
results = append(results, ApplyWriteResult{
Type: to.StringPtr(OpTypeCreate.String()),
-
Uri: "at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey,
-
Cid: nc.String(),
ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol
})
case OpTypeDelete:
err := r.DeleteRecord(context.TODO(), op.Collection+"/"+*op.Rkey)
if err != nil {
return nil, err
}
case OpTypeUpdate:
nc, err := r.UpdateRecord(context.TODO(), op.Collection+"/"+*op.Rkey, op.Record)
if err != nil {
···
})
results = append(results, ApplyWriteResult{
Type: to.StringPtr(OpTypeUpdate.String()),
-
Uri: "at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey,
-
Cid: nc.String(),
ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol
})
}
···
})
case "del":
ops = append(ops, &atproto.SyncSubscribeRepos_RepoOp{
Action: "delete",
Path: op.Rpath,
Cid: nil,
})
}
···
var blobs []lexutil.LexLink
for _, entry := range entries {
-
if err := rm.s.db.Clauses(clause.OnConflict{
-
Columns: []clause.Column{{Name: "did"}, {Name: "nsid"}, {Name: "rkey"}},
-
UpdateAll: true,
-
}).Create(&entry).Error; err != nil {
-
return nil, err
-
}
-
// we should actually check the type (i.e. delete, create,., update) here but we'll do it later
-
cids, err := rm.incrementBlobRefs(urepo, entry.Value)
-
if err != nil {
-
return nil, err
}
for _, c := range cids {
···
for _, c := range cids {
if err := rm.db.Exec("UPDATE blobs SET ref_count = ref_count + 1 WHERE did = ? AND cid = ?", urepo.Did, c.Bytes()).Error; err != nil {
return nil, err
}
}
···
type ApplyWriteResult struct {
Type *string `json:"$type,omitempty"`
+
Uri *string `json:"uri,omitempty"`
+
Cid *string `json:"cid,omitempty"`
Commit *RepoCommit `json:"commit,omitempty"`
+
ValidationStatus *string `json:"validationStatus,omitempty"`
}
type RepoCommit struct {
···
})
results = append(results, ApplyWriteResult{
Type: to.StringPtr(OpTypeCreate.String()),
+
Uri: to.StringPtr("at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey),
+
Cid: to.StringPtr(nc.String()),
ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol
})
case OpTypeDelete:
+
var old models.Record
+
if err := rm.db.Raw("SELECT value FROM records WHERE did = ? AND nsid = ? AND rkey = ?", urepo.Did, op.Collection, op.Rkey).Scan(&old).Error; err != nil {
+
return nil, err
+
}
+
entries = append(entries, models.Record{
+
Did: urepo.Did,
+
Nsid: op.Collection,
+
Rkey: *op.Rkey,
+
Value: old.Value,
+
})
err := r.DeleteRecord(context.TODO(), op.Collection+"/"+*op.Rkey)
if err != nil {
return nil, err
}
+
results = append(results, ApplyWriteResult{
+
Type: to.StringPtr(OpTypeDelete.String()),
+
})
case OpTypeUpdate:
nc, err := r.UpdateRecord(context.TODO(), op.Collection+"/"+*op.Rkey, op.Record)
if err != nil {
···
})
results = append(results, ApplyWriteResult{
Type: to.StringPtr(OpTypeUpdate.String()),
+
Uri: to.StringPtr("at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey),
+
Cid: to.StringPtr(nc.String()),
ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol
})
}
···
})
case "del":
+
ll := lexutil.LexLink(op.OldCid)
ops = append(ops, &atproto.SyncSubscribeRepos_RepoOp{
Action: "delete",
Path: op.Rpath,
Cid: nil,
+
Prev: &ll,
})
}
···
var blobs []lexutil.LexLink
for _, entry := range entries {
+
var cids []cid.Cid
+
if entry.Cid != "" {
+
if err := rm.s.db.Clauses(clause.OnConflict{
+
Columns: []clause.Column{{Name: "did"}, {Name: "nsid"}, {Name: "rkey"}},
+
UpdateAll: true,
+
}).Create(&entry).Error; err != nil {
+
return nil, err
+
}
+
cids, err = rm.incrementBlobRefs(urepo, entry.Value)
+
if err != nil {
+
return nil, err
+
}
+
} else {
+
if err := rm.s.db.Delete(&entry).Error; err != nil {
+
return nil, err
+
}
+
cids, err = rm.decrementBlobRefs(urepo, entry.Value)
+
if err != nil {
+
return nil, err
+
}
}
for _, c := range cids {
···
for _, c := range cids {
if err := rm.db.Exec("UPDATE blobs SET ref_count = ref_count + 1 WHERE did = ? AND cid = ?", urepo.Did, c.Bytes()).Error; err != nil {
return nil, err
+
}
+
}
+
+
return cids, nil
+
}
+
+
func (rm *RepoMan) decrementBlobRefs(urepo models.Repo, cbor []byte) ([]cid.Cid, error) {
+
cids, err := getBlobCidsFromCbor(cbor)
+
if err != nil {
+
return nil, err
+
}
+
+
for _, c := range cids {
+
var res struct {
+
ID uint
+
Count int
+
}
+
if err := rm.db.Raw("UPDATE blobs SET ref_count = ref_count - 1 WHERE did = ? AND cid = ? RETURNING id, ref_count", urepo.Did, c.Bytes()).Scan(&res).Error; err != nil {
+
return nil, err
+
}
+
+
if res.Count == 0 {
+
if err := rm.db.Exec("DELETE FROM blobs WHERE id = ?", res.ID).Error; err != nil {
+
return nil, err
+
}
+
if err := rm.db.Exec("DELETE FROM blob_parts WHERE blob_id = ?", res.ID).Error; err != nil {
+
return nil, err
+
}
}
}
+1
server/server.go
···
// repo
s.echo.POST("/xrpc/com.atproto.repo.createRecord", s.handleCreateRecord, s.handleSessionMiddleware)
s.echo.POST("/xrpc/com.atproto.repo.putRecord", s.handlePutRecord, s.handleSessionMiddleware)
s.echo.POST("/xrpc/com.atproto.repo.applyWrites", s.handleApplyWrites, s.handleSessionMiddleware)
s.echo.POST("/xrpc/com.atproto.repo.uploadBlob", s.handleRepoUploadBlob, s.handleSessionMiddleware)
···
// repo
s.echo.POST("/xrpc/com.atproto.repo.createRecord", s.handleCreateRecord, s.handleSessionMiddleware)
s.echo.POST("/xrpc/com.atproto.repo.putRecord", s.handlePutRecord, s.handleSessionMiddleware)
+
s.echo.POST("/xrpc/com.atproto.repo.deleteRecord", s.handleDeleteRecord, s.handleSessionMiddleware)
s.echo.POST("/xrpc/com.atproto.repo.applyWrites", s.handleApplyWrites, s.handleSessionMiddleware)
s.echo.POST("/xrpc/com.atproto.repo.uploadBlob", s.handleRepoUploadBlob, s.handleSessionMiddleware)