an app.bsky.* indexer

switch to upserts, add New* funcs, store firehose seq directly as int64, better backfill.Stop handling

Changed files
+108 -52
cmd
+1 -1
cmd/backfiller/backfill.go
···
RelayHost: "https://bsky.network",
}
-
return backfill.NewBackfiller("backfiller", store, h.HandleCreate, h.HandleUpdate, h.HandleDelete, opts)
+
return backfill.NewBackfiller("backfiller", store, h.HandleUpsert, h.HandleUpsert, h.HandleDelete, opts)
}
+10 -17
cmd/backfiller/cursors.go
···
"context"
"fmt"
"log/slog"
-
"strconv"
"sync"
"time"
···
store *gorm.DB
firehoseLk sync.Mutex
-
firehoseSeq string
+
firehoseSeq int64
reposLk sync.Mutex
reposSeq string
···
func (cs *CursorService) SetFirehoseCursor(seq int64) {
cs.firehoseLk.Lock()
-
val := strconv.Itoa(int(seq))
-
cs.firehoseSeq = val
+
cs.firehoseSeq = seq
cs.firehoseLk.Unlock()
}
···
}
func (cs *CursorService) Flush() error {
-
flusher := func(lk *sync.Mutex, key, value string) error {
-
lk.Lock()
-
if err := cs.store.Model(&cursorRecord{}).Where("key = ?", key).Update("val", value).Error; err != nil {
-
return fmt.Errorf("error updating cursor record: %+v: %w", cursorRecord{Key: key, Val: value}, err)
-
}
-
lk.Unlock()
-
return nil
+
cs.firehoseLk.Lock()
+
if err := cs.store.Model(&cursorRecord{}).Where("key = ?", "firehose").Update("val", cs.firehoseSeq).Error; err != nil {
+
return fmt.Errorf("error updating cursor record: %w", err)
}
+
cs.firehoseLk.Unlock()
-
if err := flusher(&cs.firehoseLk, "firehose", cs.firehoseSeq); err != nil {
-
return err
+
cs.reposLk.Lock()
+
if err := cs.store.Model(&cursorRecord{}).Where("key = ?", "repos").Update("val", cs.reposSeq).Error; err != nil {
+
return fmt.Errorf("error updating cursor record: %w", err)
}
-
-
if err := flusher(&cs.reposLk, "repos", cs.reposSeq); err != nil {
-
return err
-
}
+
cs.reposLk.Unlock()
return nil
}
+25 -22
cmd/backfiller/handlers.go
···
package main
import (
-
"bytes"
"context"
"fmt"
-
"strings"
+
+
"github.com/bluesky-social/indigo/atproto/syntax"
-
appbsky "github.com/bluesky-social/indigo/api/bsky"
"github.com/ipfs/go-cid"
"gorm.io/gorm"
)
···
}
func NewHandlerService(store *gorm.DB) *HandlerService {
+
store.AutoMigrate(&Account{})
store.AutoMigrate(&Profile{})
-
store.AutoMigrate(&Feedgen{})
-
// TODO the rest
+
store.AutoMigrate(&List{})
+
store.AutoMigrate(&Labeler{})
+
store.AutoMigrate(&FeedGenerator{})
+
store.AutoMigrate(&Post{})
+
store.AutoMigrate(&Like{})
+
store.AutoMigrate(&StarterPack{})
+
store.AutoMigrate(&Verification{})
+
store.AutoMigrate(&Lexicon{})
return &HandlerService{
store: store,
}
}
-
func (hs *HandlerService) HandleCreate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
-
if !strings.HasPrefix(path, "app.bsky.feed.generator/") {
-
return nil
+
// handles both creates and updates
+
func (hs *HandlerService) HandleUpsert(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
+
uri, err := syntax.ParseATURI(fmt.Sprintf("at://%s/%s", repo, path))
+
if err != nil {
+
return err
}
-
var out appbsky.FeedGenerator
-
if err := out.UnmarshalCBOR(bytes.NewReader(*rec)); err != nil {
-
return fmt.Errorf("error unmarshalling record: %w", err)
-
}
+
switch uri.Collection() {
+
case syntax.NSID("app.bsky.actor.profile"):
+
profile := NewProfile(*rec)
+
profile.AtUri = string(uri)
+
hs.store.Create(profile)
-
feedgen := Feedgen{
-
DisplayName: out.DisplayName,
+
case syntax.NSID("app.bsky.feed.generator"):
+
feedgen := NewFeedGenerator(*rec)
+
feedgen.AtUri = string(uri)
+
hs.store.Create(feedgen)
}
-
if err := hs.store.Create(&feedgen).Error; err != nil {
-
return fmt.Errorf("error saving feedgen: %w", err)
-
}
-
-
return nil
-
}
-
-
func (hs *HandlerService) HandleUpdate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
return nil
}
+3 -10
cmd/backfiller/main.go
···
return nil
}
-
if err := closeDatabase(app.state); err != nil {
-
return err
-
}
+
app.backfill.Stop(ctx)
-
if err := closeDatabase(app.content); err != nil {
-
return err
-
}
-
-
if err := app.backfill.Stop(ctx); err != nil {
-
return err
-
}
+
closeDatabase(app.state)
+
closeDatabase(app.content)
return nil
}
+69 -2
cmd/backfiller/models.go
···
package main
+
import (
+
"bytes"
+
"log/slog"
+
+
appbsky "github.com/bluesky-social/indigo/api/bsky"
+
)
+
type Account struct{}
-
type Profile struct{}
+
+
// func NewAccount
+
+
type Profile struct {
+
ID uint `gorm:"primaryKey"`
+
AtUri string
+
DisplayName *string
+
}
+
+
func NewProfile(rec []byte) *Profile {
+
var out appbsky.ActorProfile
+
if err := out.UnmarshalCBOR(bytes.NewReader(rec)); err != nil {
+
slog.Error("could not unmarshal profile CBOR", "err", err)
+
return nil
+
}
+
return &Profile{
+
DisplayName: out.DisplayName,
+
}
+
}
+
type List struct{}
+
+
// func NewList
+
type Labeler struct{}
-
type Feedgen struct {
+
+
// func NewFeedGenerator
+
+
type FeedGenerator struct {
ID uint `gorm:"primaryKey"`
+
AtUri string
DisplayName string
}
+
+
func NewFeedGenerator(rec []byte) *FeedGenerator {
+
var out appbsky.FeedGenerator
+
if err := out.UnmarshalCBOR(bytes.NewReader(rec)); err != nil {
+
slog.Error("could not unmarshal feedgen CBOR", "err", err)
+
return nil
+
}
+
return &FeedGenerator{
+
DisplayName: out.DisplayName,
+
}
+
}
+
+
type Post struct {
+
ID uint `gorm:"primaryKey"`
+
AtUri string
+
Text string
+
}
+
+
func NewPost(rec []byte) *Post {
+
return nil
+
}
+
+
type Like struct{}
+
+
// func NewLike
+
type StarterPack struct{}
+
+
// func NewStarterPack
+
type Verification struct{}
+
+
// func NewVerification
+
type Lexicon struct{}
+
+
// func NewLexicon