an app.bsky.* indexer

Merge branch 'bad-nulls'

Changed files
+53 -13
cmd
models
+1 -1
cmd/monarch/backfill.go
···
RelayHost: "https://bsky.network",
}
-
return backfill.NewBackfiller("backfiller", store, h.HandleUpsert, h.HandleUpsert, h.HandleDelete, opts)
+
return backfill.NewBackfiller("backfiller", store, h.HandleCreate, h.HandleUpdate, h.HandleDelete, opts)
}
+45 -5
cmd/monarch/handlers.go
···
"tangled.sh/edavis.dev/monarch/models"
)
+
type Action int
+
+
const (
+
ActionCreate Action = iota
+
ActionUpdate
+
)
+
type HandlerService struct {
store *gorm.DB
}
···
}
}
+
type helper struct {
+
db *gorm.DB
+
uri syntax.ATURI
+
record []byte
+
action Action
+
}
+
+
func NewHelper(db *gorm.DB, uri syntax.ATURI, record []byte, action Action) *helper {
+
return &helper{db, uri, record, action}
+
}
+
+
func upsertRecord[T any](maker func(syntax.ATURI, []byte) T, opts *helper) error {
+
obj := maker(opts.uri, opts.record)
+
switch opts.action {
+
case ActionCreate:
+
if err := opts.db.Create(obj).Error; err != nil {
+
return fmt.Errorf("error inserting %s record: %w", opts.uri.Collection(), err)
+
}
+
case ActionUpdate:
+
if err := opts.db.Save(obj).Error; err != nil {
+
return fmt.Errorf("error updating %s record: %w", opts.uri.Collection(), err)
+
}
+
}
+
return nil
+
}
+
// handles both creates and updates
-
func (hs *HandlerService) HandleUpsert(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
+
func (hs *HandlerService) HandleUpsert(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid, action Action) error {
uri, err := syntax.ParseATURI(fmt.Sprintf("at://%s/%s", repo, path))
if err != nil {
return fmt.Errorf("error parsing at-uri: %w", err)
}
+
+
opts := NewHelper(hs.store, uri, *rec, action)
switch uri.Collection() {
case syntax.NSID("app.bsky.actor.profile"):
···
}
case syntax.NSID("app.bsky.feed.repost"):
-
repost := models.NewFeedRepost(uri, *rec)
-
if err := hs.store.Where(models.FeedRepost{ID: string(uri)}).Assign(repost).FirstOrCreate(&models.FeedRepost{}).Error; err != nil {
-
return fmt.Errorf("error upserting feed repost: %w", err)
-
}
+
return upsertRecord(models.NewFeedRepost, opts)
case syntax.NSID("app.bsky.feed.threadgate"):
threadgate := models.NewFeedThreadgate(uri, *rec)
···
}
return nil
+
}
+
+
func (hs *HandlerService) HandleCreate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
+
return hs.HandleUpsert(ctx, repo, rev, path, rec, cid, ActionCreate)
+
}
+
+
func (hs *HandlerService) HandleUpdate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
+
return hs.HandleUpsert(ctx, repo, rev, path, rec, cid, ActionUpdate)
}
func (hs *HandlerService) HandleDelete(ctx context.Context, repo string, rev string, path string) error {
+6 -6
models/actor_status.go
···
}
type ActorStatus_Embed struct {
-
Description string
-
Title string
-
Uri string
+
Description string
+
Title string
+
Uri string
}
func NewActorStatus(uri syntax.ATURI, rec []byte) *ActorStatus {
···
if out.Embed != nil && out.Embed.EmbedExternal != nil && out.Embed.EmbedExternal.External != nil {
status.Embed = &ActorStatus_Embed{
-
Description: out.Embed.EmbedExternal.External.Description,
-
Title: out.Embed.EmbedExternal.External.Title,
-
Uri: out.Embed.EmbedExternal.External.Uri,
+
Description: out.Embed.EmbedExternal.External.Description,
+
Title: out.Embed.EmbedExternal.External.Title,
+
Uri: out.Embed.EmbedExternal.External.Uri,
}
}
+1 -1
models/graph_starterpack.go
···
for _, feed := range out.Feeds {
pack.Feeds = append(pack.Feeds, GraphStarterpack_Feed{
GraphStarterpackID: pack.ID,
-
Uri: feed.Uri,
+
Uri: feed.Uri,
})
}
}