an app.bsky.* indexer

split out create from update

Changed files
+47 -9
cmd
+1 -1
cmd/monarch/backfill.go
···
RelayHost: "https://bsky.network",
}
-
return backfill.NewBackfiller("backfiller", store, h.HandleUpsert, h.HandleUpsert, h.HandleDelete, opts)
+
return backfill.NewBackfiller("backfiller", store, h.HandleCreate, h.HandleUpdate, h.HandleDelete, opts)
}
+46 -8
cmd/monarch/handlers.go
···
"tangled.sh/edavis.dev/monarch/models"
)
+
type Action int
+
+
const (
+
ActionCreate Action = iota
+
ActionUpdate
+
)
+
type HandlerService struct {
store *gorm.DB
}
···
}
}
+
type helper struct {
+
db *gorm.DB
+
uri syntax.ATURI
+
record []byte
+
action Action
+
}
+
+
func NewHelper(db *gorm.DB, uri syntax.ATURI, record []byte, action Action) *helper {
+
return &helper{db, uri, record, action}
+
}
+
+
func upsertRecord[T any](maker func(syntax.ATURI, []byte) T, opts *helper) error {
+
obj := maker(opts.uri, opts.record)
+
switch opts.action {
+
case ActionCreate:
+
if err := opts.db.Create(obj).Error; err != nil {
+
return fmt.Errorf("error inserting %s record: %w", opts.uri.Collection(), err)
+
}
+
case ActionUpdate:
+
if err := opts.db.Save(obj).Error; err != nil {
+
return fmt.Errorf("error updating %s record: %w", opts.uri.Collection(), err)
+
}
+
}
+
return nil
+
}
+
// handles both creates and updates
-
func (hs *HandlerService) HandleUpsert(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
+
func (hs *HandlerService) HandleUpsert(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid, action Action) error {
uri, err := syntax.ParseATURI(fmt.Sprintf("at://%s/%s", repo, path))
if err != nil {
return fmt.Errorf("error parsing at-uri: %w", err)
}
+
+
opts := NewHelper(hs.store, uri, *rec, action)
switch uri.Collection() {
case syntax.NSID("app.bsky.actor.profile"):
···
return nil
}
-
db := hs.store.Session(&gorm.Session{DryRun: true})
-
repost := models.NewFeedRepost(uri, *rec)
-
// stmt := db.Where(models.FeedRepost{ID: string(uri)}).Assign(repost).FirstOrCreate(&models.FeedRepost{}).Statement
-
stmt := db.Create(repost).Statement
-
-
slog.Info("repost sql statement", "sql", stmt.SQL.String(), "vars", stmt.Vars)
-
// if err := hs.store.Where(models.FeedRepost{ID: string(uri)}).Assign(repost).FirstOrCreate(&models.FeedRepost{}).Error; err != nil {
// return fmt.Errorf("error upserting feed repost: %w", err)
// }
+
+
return upsertRecord(models.NewFeedRepost, opts)
case syntax.NSID("app.bsky.feed.threadgate"):
threadgate := models.NewFeedThreadgate(uri, *rec)
···
}
return nil
+
}
+
+
func (hs *HandlerService) HandleCreate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
+
return hs.HandleUpsert(ctx, repo, rev, path, rec, cid, ActionCreate)
+
}
+
+
func (hs *HandlerService) HandleUpdate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
+
return hs.HandleUpsert(ctx, repo, rev, path, rec, cid, ActionUpdate)
}
func (hs *HandlerService) HandleDelete(ctx context.Context, repo string, rev string, path string) error {