an app.bsky.* indexer

rework database creations, start storing feedgens

Changed files
+82 -55
cmd
+44 -16
cmd/backfiller/database.go
···
package main
import (
+
"fmt"
+
"log"
"log/slog"
+
"os"
+
"time"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
+
"gorm.io/gorm/logger"
)
-
type FeedGenerator struct {
-
ID uint `gorm:"primaryKey"`
-
AtUri string `gorm:"unique"`
-
DisplayName string
-
Description string
-
FeedService string
-
AcceptsInteractions bool
-
ContentMode string
-
CreatedAt string
-
}
-
-
func NewDatabase() *gorm.DB {
+
func NewDatabase(path string) *gorm.DB {
sl := slog.With("source", "database")
-
db, err := gorm.Open(sqlite.Open("database.db"), &gorm.Config{})
+
l := logger.New(
+
log.New(os.Stdout, "\r\n", log.LstdFlags),
+
logger.Config{
+
SlowThreshold: time.Second,
+
Colorful: false,
+
},
+
)
+
db, err := gorm.Open(sqlite.Open(path), &gorm.Config{
+
Logger: l,
+
})
if err != nil {
-
sl.Error("failed to connect to database", "err", err)
+
sl.Error("failed to open database", "err", err)
}
-
db.AutoMigrate(&FeedGenerator{})
+
db.Exec("PRAGMA journal_mode=WAL")
+
return db
+
}
-
return db
+
func (b *Backend) CleanlyClose() error {
+
closeDb := func(db *gorm.DB) error {
+
if err := db.Exec("PRAGMA wal_checkpoint(TRUNCATE)").Error; err != nil {
+
return fmt.Errorf("failed checkpointing the WAL: %w", err)
+
}
+
rawDb, err := db.DB()
+
if err != nil {
+
return fmt.Errorf("failed getting underlying DB connection: %w", err)
+
}
+
if err := rawDb.Close(); err != nil {
+
return fmt.Errorf("failed closing underlying DB connection: %w", err)
+
}
+
return nil
+
}
+
+
if err := closeDb(b.state); err != nil {
+
return fmt.Errorf("failed to close state database: %w", err)
+
}
+
+
if err := closeDb(b.data); err != nil {
+
return fmt.Errorf("failed to close content database: %w", err)
+
}
+
+
return nil
}
+10 -6
cmd/backfiller/handlers.go
···
package main
import (
+
"bytes"
"context"
-
"encoding/json"
"errors"
"fmt"
+
"log/slog"
"strconv"
"strings"
···
return nil
}
+
sl := slog.With("source", "commitHandler")
+
var out appbsky.FeedGenerator
-
if err := json.Unmarshal(*rec, &out); err != nil {
-
return fmt.Errorf("failed to unmarshal feedgen record: %w", err)
+
if err := out.UnmarshalCBOR(bytes.NewReader(*rec)); err != nil {
+
sl.Error("failed to unmarshal record", "err", err)
+
return fmt.Errorf("failed to unmarshal record: %w", err)
}
feedgen := &FeedGenerator{
···
DisplayName: out.DisplayName,
FeedService: out.Did,
CreatedAt: out.CreatedAt,
-
Description: *out.Description,
-
AcceptsInteractions: *out.AcceptsInteractions,
-
ContentMode: *out.ContentMode,
+
Description: out.Description,
+
ContentMode: out.ContentMode,
+
AcceptsInteractions: out.AcceptsInteractions,
}
if err := b.data.Model(&FeedGenerator{}).Create(feedgen).Error; err != nil {
+16 -33
cmd/backfiller/main.go
···
import (
"context"
-
"log"
"log/slog"
"net/http"
"os"
···
"github.com/bluesky-social/indigo/events/schedulers/parallel"
"github.com/gorilla/websocket"
-
"gorm.io/driver/sqlite"
"gorm.io/gorm"
-
"gorm.io/gorm/logger"
)
-
func NewState() *gorm.DB {
-
sl := slog.With("source", "state")
-
newLogger := logger.New(
-
log.New(os.Stdout, "\r\n", log.LstdFlags),
-
logger.Config{
-
SlowThreshold: 1 * time.Second,
-
Colorful: false,
-
},
-
)
-
db, err := gorm.Open(sqlite.Open("state.db"), &gorm.Config{
-
Logger: newLogger,
-
})
-
if err != nil {
-
sl.Error("failed to connect to database", "err", err)
-
}
-
db.AutoMigrate(&backfill.GormDBJob{})
-
db.AutoMigrate(&cursorRecord{})
-
-
return db
-
}
-
func NewBackfiller(
db *gorm.DB, create handleOpCreateUpdate, update handleOpCreateUpdate, delete handleOpDelete,
) *backfill.Backfiller {
···
SyncRequestsPerSecond: 5,
// NSIDFilter: "app.bsky.feed.generator",
-
RelayHost: "https://bsky.network",
+
RelayHost: "https://bsky.network",
}
return backfill.NewBackfiller(
···
streamClosed := make(chan struct{})
streamCtx, streamCancel := context.WithCancel(context.Background())
-
state := NewState()
-
db := NewDatabase()
-
backend := NewBackend(state, db)
+
stateDb := NewDatabase("state.db")
+
stateDb.AutoMigrate(&backfill.GormDBJob{})
+
stateDb.AutoMigrate(&cursorRecord{})
+
+
contentDb := NewDatabase("database.db")
+
contentDb.AutoMigrate(&FeedGenerator{})
-
bf := NewBackfiller(state, backend.HandleCreateOp, backend.HandleUpdateOp, backend.HandleDeleteOp)
+
backend := NewBackend(stateDb, contentDb)
+
+
bf := NewBackfiller(stateDb, backend.HandleCreateOp, backend.HandleUpdateOp, backend.HandleDeleteOp)
go bf.Start()
// attach the backfiller to the backend so the curors, the pump,
···
defer cancel()
bf.Stop(endctx)
-
if err := backend.FlushCursors(); err != nil {
-
sl.Error("final flush cursor failed", "err", err)
-
}
-
close(quit)
}()
<-quit
+
sl.Info("flushing cursors")
if err := backend.FlushCursors(); err != nil {
sl.Error("failed to flush cursors on close", "err", err)
+
}
+
+
sl.Info("closing databases")
+
if err := backend.CleanlyClose(); err != nil {
+
sl.Error("failed to close databases", "err", err)
}
}
+12
cmd/backfiller/models.go
···
+
package main
+
+
type FeedGenerator struct {
+
ID uint `gorm:"primaryKey"`
+
AtUri string `gorm:"unique"`
+
DisplayName string
+
FeedService string
+
Description *string
+
ContentMode *string
+
AcceptsInteractions *bool
+
CreatedAt string
+
}