an app.bsky.* indexer

start storing feedgens

Changed files
+68 -7
cmd
+3 -1
cmd/backfiller/backend.go
···
type Backend struct {
state *gorm.DB
+
data *gorm.DB
bf *backfill.Backfiller
backfillComplete bool
···
reposSeq string
}
-
func NewBackend(state *gorm.DB) *Backend {
+
func NewBackend(state, data *gorm.DB) *Backend {
return &Backend{
state: state,
+
data: data,
}
}
+30
cmd/backfiller/database.go
···
+
package main
+
+
import (
+
"log/slog"
+
+
"gorm.io/driver/sqlite"
+
"gorm.io/gorm"
+
)
+
+
type FeedGenerator struct {
+
ID uint `gorm:"primaryKey"`
+
AtUri string `gorm:"unique"`
+
DisplayName string
+
Description string
+
FeedService string
+
AcceptsInteractions bool
+
ContentMode string
+
CreatedAt string
+
}
+
+
func NewDatabase() *gorm.DB {
+
sl := slog.With("source", "database")
+
db, err := gorm.Open(sqlite.Open("database.db"), &gorm.Config{})
+
if err != nil {
+
sl.Error("failed to connect to database", "err", err)
+
}
+
db.AutoMigrate(&FeedGenerator{})
+
+
return db
+
}
+26
cmd/backfiller/handlers.go
···
import (
"context"
+
"encoding/json"
"errors"
"fmt"
"strconv"
+
"strings"
comatproto "github.com/bluesky-social/indigo/api/atproto"
+
appbsky "github.com/bluesky-social/indigo/api/bsky"
"github.com/bluesky-social/indigo/backfill"
"github.com/ipfs/go-cid"
)
···
type handleOpDelete func(context.Context, string, string, string) error
func (b *Backend) HandleCreateOp(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error {
+
if !strings.HasPrefix(path, "app.bsky.feed.generator/") {
+
return nil
+
}
+
+
var out appbsky.FeedGenerator
+
if err := json.Unmarshal(*rec, &out); err != nil {
+
return fmt.Errorf("failed to unmarshal feedgen record: %w", err)
+
}
+
+
feedgen := &FeedGenerator{
+
AtUri: fmt.Sprintf("at://%s/%s", repo, path),
+
DisplayName: out.DisplayName,
+
FeedService: out.Did,
+
CreatedAt: out.CreatedAt,
+
Description: *out.Description,
+
AcceptsInteractions: *out.AcceptsInteractions,
+
ContentMode: *out.ContentMode,
+
}
+
+
if err := b.data.Model(&FeedGenerator{}).Create(feedgen).Error; err != nil {
+
return fmt.Errorf("error adding feedgen to database: %w", err)
+
}
+
return nil
}
+9 -6
cmd/backfiller/main.go
···
"gorm.io/gorm/logger"
)
-
func NewDatabase() *gorm.DB {
-
sl := slog.With("source", "database")
+
func NewState() *gorm.DB {
+
sl := slog.With("source", "state")
newLogger := logger.New(
log.New(os.Stdout, "\r\n", log.LstdFlags),
logger.Config{
···
ParallelRecordCreates: 1, // sqlite
SyncRequestsPerSecond: 5,
-
RelayHost: "https://bsky.network",
+
// NSIDFilter: "app.bsky.feed.generator",
+
RelayHost: "https://bsky.network",
}
return backfill.NewBackfiller(
···
streamClosed := make(chan struct{})
streamCtx, streamCancel := context.WithCancel(context.Background())
+
state := NewState()
db := NewDatabase()
-
backend := NewBackend(db)
+
backend := NewBackend(state, db)
-
bf := NewBackfiller(db, backend.HandleCreateOp, backend.HandleUpdateOp, backend.HandleDeleteOp)
+
bf := NewBackfiller(state, backend.HandleCreateOp, backend.HandleUpdateOp, backend.HandleDeleteOp)
go bf.Start()
-
// attach the backfiller to the backend so pump and repo commit handler can use it
+
// attach the backfiller to the backend so the curors, the pump,
+
// and the commit callback can use it
backend.bf = bf
go backend.SyncCursors(streamCtx)