an app.bsky.* indexer

Merge branch 'jsonb'

Changed files
+66 -123
cmd
+21 -3
cmd/monarch/census.go
···
cursor *CursorService
backfill *backfill.Backfiller
+
seenHosts map[string]bool
+
seenLk sync.Mutex
+
storeLk sync.Mutex
}
···
func NewCensusService(cursorSvc *CursorService, backfillSvc *backfill.Backfiller) *CensusService {
return &CensusService{
-
cursor: cursorSvc,
-
backfill: backfillSvc,
+
cursor: cursorSvc,
+
backfill: backfillSvc,
+
seenHosts: make(map[string]bool),
}
}
···
}
for _, host := range res.Hosts {
+
// don't reprocess hosts already handled
+
cs.seenLk.Lock()
+
_, ok := cs.seenHosts[host.Hostname]
+
cs.seenLk.Unlock()
+
if ok {
+
slog.Info("already processed host, skipping")
+
continue
+
}
+
sem.Acquire(ctx, 1)
wg.Add(1) // TODO wg.Go
go func() {
···
}
slog.Info("finished listing repos", "host", host)
+
+
cs.seenLk.Lock()
+
defer cs.seenLk.Unlock()
+
+
cs.seenHosts[host] = true
}
func (cs *CensusService) Start(ctx context.Context, cctx *cli.Context) {
···
cs.listHosts(ctx, cctx)
slog.Info("finished with initial refresh, starting ticker")
-
t := time.NewTicker(time.Hour)
+
t := time.NewTicker(time.Minute * 5)
defer t.Stop()
for {
+1 -6
cmd/monarch/cursors.go
···
select {
case <-ctx.Done():
slog.Info("stopping cursor checkpointer", "err", ctx.Err())
-
-
slog.Info("persisting firehose cursor before exit", "seq", cs.firehoseSeq)
-
if err := cs.PersistFirehoseCursor(); err != nil {
-
slog.Error("error persisting firehose cursor", "err", err)
-
}
-
+
cs.PersistFirehoseCursor() // one final save
return
case <-t.C:
}
+44 -114
cmd/monarch/handlers.go
···
package main
import (
+
"bytes"
"context"
+
"encoding/json"
"fmt"
+
appbsky "github.com/bluesky-social/indigo/api/bsky"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/ipfs/go-cid"
"gorm.io/gorm"
-
"gorm.io/gorm/clause"
-
"tangled.sh/edavis.dev/monarch/models"
)
type Action int
···
store *gorm.DB
}
-
func NewHandlerService(store *gorm.DB) *HandlerService {
-
migrations := []any{
-
// app.bsky.*
-
&models.ActorProfile{},
-
&models.ActorProfile_Label{},
-
&models.ActorStatus{},
+
type Record struct {
+
ID int `gorm:"primaryKey"`
+
Repo string
+
Collection string
+
Key string
+
Data json.RawMessage `gorm:"type:jsonb"`
+
}
-
&models.FeedGenerator{},
-
&models.FeedGenerator_Label{},
-
&models.FeedLike{},
-
&models.FeedPost{},
-
&models.FeedPost_Label{},
-
&models.FeedPost_Lang{},
-
&models.FeedPost_Tag{},
-
&models.FeedPostgate{},
-
&models.FeedPostgate_DetachedEmbeddingUri{},
-
&models.FeedPostgate_EmbeddingRule{},
-
&models.FeedRepost{},
-
&models.FeedThreadgate{},
-
&models.FeedThreadgate_AllowRule{},
-
&models.FeedThreadgate_HiddenReply{},
-
-
&models.GraphBlock{},
-
&models.GraphFollow{},
-
&models.GraphList{},
-
&models.GraphListblock{},
-
&models.GraphListitem{},
-
&models.GraphStarterpack{},
-
&models.GraphStarterpack_Feed{},
-
&models.GraphVerification{},
-
-
&models.LabelerService{},
-
&models.LabelerService_Label{},
-
&models.LabelerService_Policy{},
-
&models.LabelerService_ReasonType{},
-
&models.LabelerService_SubjectCollection{},
-
&models.LabelerService_SubjectType{},
-
-
// chat.bsky.*
-
&models.ActorDeclaration{},
-
}
-
for _, migration := range migrations {
-
store.AutoMigrate(migration)
-
}
+
func NewHandlerService(store *gorm.DB) *HandlerService {
+
store.AutoMigrate(&Record{})
return &HandlerService{
store: store,
}
}
-
type helper struct {
-
db *gorm.DB
-
uri syntax.ATURI
-
record []byte
-
action Action
-
}
-
-
func NewHelper(db *gorm.DB, uri syntax.ATURI, record []byte, action Action) *helper {
-
return &helper{db, uri, record, action}
-
}
-
-
func upsertRecord[T any](maker func(syntax.ATURI, []byte) T, opts *helper) error {
-
obj := maker(opts.uri, opts.record)
-
switch opts.action {
-
case ActionCreate:
-
if err := opts.db.Omit(clause.Associations).Create(obj).Error; err != nil {
-
return fmt.Errorf("error inserting %s record: %w", opts.uri.Collection(), err)
-
}
-
case ActionUpdate:
-
if err := opts.db.Save(obj).Error; err != nil {
-
return fmt.Errorf("error updating %s record: %w", opts.uri.Collection(), err)
-
}
-
}
-
return nil
+
var trackedCollections = map[syntax.NSID]bool{
+
syntax.NSID("app.bsky.actor.profile"): false,
+
syntax.NSID("app.bsky.feed.generator"): true,
+
syntax.NSID("app.bsky.labeler.service"): true,
+
syntax.NSID("app.bsky.graph.list"): true,
+
syntax.NSID("app.bsky.graph.verification"): true,
+
syntax.NSID("app.bsky.graph.starterpack"): true,
}
func (hs *HandlerService) HandleUpsert(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid, action Action) error {
···
return fmt.Errorf("error parsing at-uri: %w", err)
}
-
opts := NewHelper(hs.store, uri, *rec, action)
+
tracked := trackedCollections[uri.Collection()]
+
if !tracked {
+
return nil
+
}
+
var body []byte
switch uri.Collection() {
case syntax.NSID("app.bsky.actor.profile"):
-
return upsertRecord(models.NewActorProfile, opts)
-
-
case syntax.NSID("app.bsky.actor.status"):
-
return upsertRecord(models.NewActorStatus, opts)
+
var out appbsky.ActorProfile
+
out.UnmarshalCBOR(bytes.NewReader(*rec))
+
body, err = json.Marshal(out)
case syntax.NSID("app.bsky.feed.generator"):
-
return upsertRecord(models.NewFeedGenerator, opts)
-
-
case syntax.NSID("app.bsky.feed.like"):
-
return upsertRecord(models.NewFeedLike, opts)
-
-
case syntax.NSID("app.bsky.feed.post"):
-
return upsertRecord(models.NewFeedPost, opts)
-
-
case syntax.NSID("app.bsky.feed.postgate"):
-
return upsertRecord(models.NewFeedPostgate, opts)
-
-
case syntax.NSID("app.bsky.feed.repost"):
-
return upsertRecord(models.NewFeedRepost, opts)
-
-
case syntax.NSID("app.bsky.feed.threadgate"):
-
return upsertRecord(models.NewFeedThreadgate, opts)
-
-
case syntax.NSID("app.bsky.graph.block"):
-
return upsertRecord(models.NewGraphBlock, opts)
-
-
case syntax.NSID("app.bsky.graph.follow"):
-
return upsertRecord(models.NewGraphFollow, opts)
-
-
case syntax.NSID("app.bsky.graph.list"):
-
return upsertRecord(models.NewGraphList, opts)
-
-
case syntax.NSID("app.bsky.graph.listblock"):
-
return upsertRecord(models.NewGraphListblock, opts)
-
-
case syntax.NSID("app.bsky.graph.listitem"):
-
return upsertRecord(models.NewGraphListitem, opts)
-
-
case syntax.NSID("app.bsky.graph.starterpack"):
-
return upsertRecord(models.NewGraphStarterpack, opts)
-
-
case syntax.NSID("app.bsky.graph.verification"):
-
return upsertRecord(models.NewGraphVerification, opts)
+
var out appbsky.FeedGenerator
+
out.UnmarshalCBOR(bytes.NewReader(*rec))
+
body, err = json.Marshal(out)
case syntax.NSID("app.bsky.labeler.service"):
-
return upsertRecord(models.NewLabelerService, opts)
+
var out appbsky.LabelerService
+
out.UnmarshalCBOR(bytes.NewReader(*rec))
+
body, err = json.Marshal(out)
+
}
-
case syntax.NSID("chat.bsky.actor.declaration"):
-
return upsertRecord(models.NewActorDeclaration, opts)
+
switch action {
+
case ActionCreate:
+
if err := hs.store.Create(&Record{
+
Repo: repo,
+
Collection: string(uri.Collection()),
+
Key: string(uri.RecordKey()),
+
Data: json.RawMessage(body),
+
}).Error; err != nil {
+
return err
+
}
}
return nil