an app.bsky.* indexer

Compare changes

Choose any two refs to compare.

+15
cmd/backfiller2/backfill.go
···
+
package main
+
+
import "github.com/bluesky-social/indigo/backfill"
+
+
func NewBackfillService(store backfill.Store, h *HandlerService) *backfill.Backfiller {
+
opts := &backfill.BackfillOptions{
+
ParallelBackfills: 10,
+
ParallelRecordCreates: 1,
+
NSIDFilter: "",
+
SyncRequestsPerSecond: 5,
+
RelayHost: "https://bsky.network",
+
}
+
+
return backfill.NewBackfiller("backfiller", store, h.HandleCreate, h.HandleUpdate, h.HandleDelete, opts)
+
}
+71
cmd/backfiller2/census.go
···
+
package main
+
+
import (
+
"context"
+
"log/slog"
+
+
"github.com/bluesky-social/indigo/api/atproto"
+
"github.com/bluesky-social/indigo/backfill"
+
"github.com/bluesky-social/indigo/xrpc"
+
)
+
+
type CensusService struct {
+
cursor *CursorService
+
backfill *backfill.Backfiller
+
}
+
+
type jobMaker interface {
+
GetOrCreateJob(context.Context, string, string) (backfill.Job, error)
+
}
+
+
func NewCensusService(cursorSvc *CursorService, backfillSvc *backfill.Backfiller) *CensusService {
+
return &CensusService{
+
cursor: cursorSvc,
+
backfill: backfillSvc,
+
}
+
}
+
+
func (cs *CensusService) Start(ctx context.Context) {
+
xrpcc := &xrpc.Client{
+
Host: "https://bsky.network",
+
}
+
+
jmstore, ok := cs.backfill.Store.(jobMaker)
+
if !ok {
+
slog.Error("configured job store doesn't support random job creation")
+
return
+
}
+
+
curs, _ := cs.cursor.Get("repos")
+
for {
+
select {
+
case <-ctx.Done():
+
slog.Info("stopping repo census")
+
return
+
default:
+
}
+
+
slog.Info("listing repos", "cursor", curs)
+
res, err := atproto.SyncListRepos(ctx, xrpcc, curs, 1000)
+
if err != nil {
+
slog.Error("error listing repos", "err", err)
+
return
+
}
+
+
for _, repo := range res.Repos {
+
_, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
+
if err != nil {
+
slog.Error("error adding listed repo to backfiller", "err", err)
+
}
+
}
+
+
if res.Cursor != nil && *res.Cursor != "" {
+
cs.cursor.reposLk.Lock()
+
curs = *res.Cursor
+
cs.cursor.reposSeq = curs
+
cs.cursor.reposLk.Unlock()
+
} else {
+
break
+
}
+
}
+
}
+109
cmd/backfiller2/cursors.go
···
+
package main
+
+
import (
+
"context"
+
"fmt"
+
"log/slog"
+
"strconv"
+
"sync"
+
"time"
+
+
"gorm.io/gorm"
+
)
+
+
type cursorRecord struct {
+
ID uint `gorm:"primaryKey"`
+
Key string
+
Val string
+
}
+
+
type CursorService struct {
+
store *gorm.DB
+
+
firehoseLk sync.Mutex
+
firehoseSeq string
+
+
reposLk sync.Mutex
+
reposSeq string
+
}
+
+
func NewCursorService(store *gorm.DB) *CursorService {
+
store.AutoMigrate(&cursorRecord{})
+
+
var rec cursorRecord
+
store.First(&rec, 1)
+
if rec.ID == 0 {
+
store.Create(&cursorRecord{ID: 1, Key: "firehose", Val: ""})
+
}
+
+
store.First(&rec, 2)
+
if rec.ID == 0 {
+
store.Create(&cursorRecord{ID: 2, Key: "repos", Val: ""})
+
}
+
+
return &CursorService{
+
store: store,
+
}
+
}
+
+
func (cs *CursorService) Get(key string) (string, error) {
+
var rec cursorRecord
+
if err := cs.store.Where("key = ?", key).First(&rec).Error; err != nil {
+
return "", fmt.Errorf("error fetching cursor record: %w", err)
+
}
+
return rec.Val, nil
+
}
+
+
func (cs *CursorService) SetFirehoseCursor(seq int64) {
+
cs.firehoseLk.Lock()
+
val := strconv.Itoa(int(seq))
+
cs.firehoseSeq = val
+
cs.firehoseLk.Unlock()
+
}
+
+
func (cs *CursorService) SetReposCursor(value string) {
+
cs.reposLk.Lock()
+
cs.reposSeq = value
+
cs.reposLk.Unlock()
+
}
+
+
func (cs *CursorService) Flush() error {
+
flusher := func(lk *sync.Mutex, key, value string) error {
+
lk.Lock()
+
if err := cs.store.Model(&cursorRecord{}).Where("key = ?", key).Update("val", value).Error; err != nil {
+
return fmt.Errorf("error updating cursor record: %+v: %w", cursorRecord{Key: key, Val: value}, err)
+
}
+
lk.Unlock()
+
return nil
+
}
+
+
if err := flusher(&cs.firehoseLk, "firehose", cs.firehoseSeq); err != nil {
+
return err
+
}
+
+
if err := flusher(&cs.reposLk, "repos", cs.reposSeq); err != nil {
+
return err
+
}
+
+
return nil
+
}
+
+
func (cs *CursorService) CheckpointCursors(ctx context.Context) {
+
t := time.NewTicker(time.Second * 5)
+
defer t.Stop()
+
+
for {
+
select {
+
case <-ctx.Done():
+
slog.Info("stopping cursor checkpointer")
+
return
+
case <-t.C:
+
}
+
+
slog.Info("flushing cursors")
+
if err := cs.Flush(); err != nil {
+
slog.Error("error flushing cursors", "err", err)
+
return
+
}
+
}
+
}
+31
cmd/backfiller2/database.go
···
+
package main
+
+
import (
+
"log"
+
"log/slog"
+
"os"
+
"time"
+
+
"gorm.io/driver/sqlite"
+
"gorm.io/gorm"
+
"gorm.io/gorm/logger"
+
)
+
+
func NewDatabase(path string) *gorm.DB {
+
sl := slog.With("source", "database")
+
l := logger.New(
+
log.New(os.Stdout, "\r\n", log.LstdFlags),
+
logger.Config{
+
SlowThreshold: time.Second,
+
Colorful: false,
+
},
+
)
+
db, err := gorm.Open(sqlite.Open(path), &gorm.Config{
+
Logger: l,
+
})
+
if err != nil {
+
sl.Error("failed to open database", "err", err)
+
}
+
db.Exec("PRAGMA journal_mode=WAL")
+
return db
+
}
+25
cmd/backfiller2/firehose.go
···
+
package main
+
+
import (
+
"context"
+
"net/http"
+
+
"github.com/gorilla/websocket"
+
)
+
+
func NewFirehoseConnection(ctx context.Context, cursorSvc *CursorService) (*websocket.Conn, error) {
+
url := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
+
curs, _ := cursorSvc.Get("firehose")
+
if curs != "" {
+
url += "?cursor=" + curs
+
}
+
+
conn, _, err := websocket.DefaultDialer.DialContext(ctx, url, http.Header{
+
"User-Agent": []string{"backfiller/0.1 (@edavis.dev)"},
+
})
+
if err != nil {
+
return nil, err
+
}
+
+
return conn, nil
+
}
+55
cmd/backfiller2/handlers.go
···
+
package main
+
+
import (
+
"bytes"
+
"context"
+
"fmt"
+
"strings"
+
+
appbsky "github.com/bluesky-social/indigo/api/bsky"
+
"github.com/ipfs/go-cid"
+
"gorm.io/gorm"
+
)
+
+
type HandlerService struct {
+
store *gorm.DB
+
}
+
+
func NewHandlerService(store *gorm.DB) *HandlerService {
+
store.AutoMigrate(&Profile{})
+
store.AutoMigrate(&Feedgen{})
+
// TODO the rest
+
+
return &HandlerService{
+
store: store,
+
}
+
}
+
+
func (hs *HandlerService) HandleCreate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
+
if !strings.HasPrefix(path, "app.bsky.feed.generator/") {
+
return nil
+
}
+
+
var out appbsky.FeedGenerator
+
if err := out.UnmarshalCBOR(bytes.NewReader(*rec)); err != nil {
+
return fmt.Errorf("error unmarshalling record: %w", err)
+
}
+
+
feedgen := Feedgen{
+
DisplayName: out.DisplayName,
+
}
+
+
if err := hs.store.Create(&feedgen).Error; err != nil {
+
return fmt.Errorf("error saving feedgen: %w", err)
+
}
+
+
return nil
+
}
+
+
func (hs *HandlerService) HandleUpdate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
+
return nil
+
}
+
+
func (hs *HandlerService) HandleDelete(ctx context.Context, repo string, rev string, path string) error {
+
return nil
+
}
+122
cmd/backfiller2/main.go
···
+
package main
+
+
import (
+
"context"
+
"fmt"
+
"log/slog"
+
"os/signal"
+
"syscall"
+
"time"
+
+
comatproto "github.com/bluesky-social/indigo/api/atproto"
+
"github.com/bluesky-social/indigo/backfill"
+
"github.com/bluesky-social/indigo/events"
+
"github.com/bluesky-social/indigo/events/schedulers/parallel"
+
"github.com/gorilla/websocket"
+
"gorm.io/gorm"
+
)
+
+
type App struct {
+
backfill *backfill.Backfiller
+
cursor *CursorService
+
handler *HandlerService
+
census *CensusService
+
wsconn *websocket.Conn
+
state *gorm.DB
+
content *gorm.DB
+
}
+
+
func NewApp() *App {
+
stateDatabase := NewDatabase("state.db")
+
stateDatabase.AutoMigrate(&backfill.GormDBJob{})
+
+
contentDatabase := NewDatabase("content.db")
+
+
return &App{
+
state: stateDatabase,
+
content: contentDatabase,
+
}
+
}
+
+
func (app *App) Start(ctx context.Context) error {
+
app.cursor = NewCursorService(app.state)
+
go app.cursor.CheckpointCursors(ctx)
+
+
app.handler = NewHandlerService(app.content)
+
+
app.backfill = NewBackfillService(backfill.NewGormstore(app.state), app.handler)
+
go app.backfill.Start()
+
+
app.census = NewCensusService(app.cursor, app.backfill)
+
go app.census.Start(ctx)
+
+
wsconn, err := NewFirehoseConnection(ctx, app.cursor)
+
if err != nil {
+
return fmt.Errorf("error connecting to relay: %w", err)
+
}
+
app.wsconn = wsconn
+
+
rsc := events.RepoStreamCallbacks{
+
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
+
app.cursor.SetFirehoseCursor(evt.Seq)
+
return app.backfill.HandleEvent(ctx, evt)
+
},
+
// TODO account
+
// TODO identity
+
}
+
+
sched := parallel.NewScheduler(4, 50, "firehose", rsc.EventHandler)
+
+
if err := events.HandleRepoStream(ctx, app.wsconn, sched, nil); err != nil {
+
return fmt.Errorf("error starting repo stream handler: %w", err)
+
}
+
+
return nil
+
}
+
+
func (app *App) Stop(ctx context.Context) error {
+
closeDatabase := func(db *gorm.DB) error {
+
raw, err := db.DB()
+
if err != nil {
+
return fmt.Errorf("error getting raw DB: %w", err)
+
}
+
if err := raw.Close(); err != nil {
+
return fmt.Errorf("error closing DB: %w", err)
+
}
+
return nil
+
}
+
+
if err := closeDatabase(app.state); err != nil {
+
return err
+
}
+
+
if err := closeDatabase(app.content); err != nil {
+
return err
+
}
+
+
if err := app.backfill.Stop(ctx); err != nil {
+
return err
+
}
+
+
return nil
+
}
+
+
func main() {
+
ctx, cancel := signal.NotifyContext(context.TODO(), syscall.SIGINT, syscall.SIGTERM)
+
defer cancel()
+
+
app := NewApp()
+
if err := app.Start(ctx); err != nil {
+
slog.Error("failed to start backfiller", "err", err)
+
}
+
+
<-ctx.Done()
+
slog.Info("shutting down")
+
+
endctx, cancel := context.WithTimeout(context.TODO(), time.Second*15)
+
defer cancel()
+
+
if err := app.Stop(endctx); err != nil {
+
slog.Error("error during shutdown", "err", err)
+
}
+
}
+13
cmd/backfiller2/models.go
···
+
package main
+
+
type Account struct{}
+
type Profile struct{}
+
type List struct{}
+
type Labeler struct{}
+
type Feedgen struct {
+
ID uint `gorm:"primaryKey"`
+
DisplayName string
+
}
+
type StarterPack struct{}
+
type Verification struct{}
+
type Lexicon struct{}
cmd/backfiller2/census.go cmd/backfiller/census.go
cmd/backfiller2/firehose.go cmd/backfiller/firehose.go
+10 -17
cmd/backfiller/cursors.go
···
"context"
"fmt"
"log/slog"
-
"strconv"
"sync"
"time"
···
store *gorm.DB
firehoseLk sync.Mutex
-
firehoseSeq string
+
firehoseSeq int64
reposLk sync.Mutex
reposSeq string
···
func (cs *CursorService) SetFirehoseCursor(seq int64) {
cs.firehoseLk.Lock()
-
val := strconv.Itoa(int(seq))
-
cs.firehoseSeq = val
+
cs.firehoseSeq = seq
cs.firehoseLk.Unlock()
}
···
}
func (cs *CursorService) Flush() error {
-
flusher := func(lk *sync.Mutex, key, value string) error {
-
lk.Lock()
-
if err := cs.store.Model(&cursorRecord{}).Where("key = ?", key).Update("val", value).Error; err != nil {
-
return fmt.Errorf("error updating cursor record: %+v: %w", cursorRecord{Key: key, Val: value}, err)
-
}
-
lk.Unlock()
-
return nil
-
}
-
-
if err := flusher(&cs.firehoseLk, "firehose", cs.firehoseSeq); err != nil {
-
return err
+
cs.firehoseLk.Lock()
+
if err := cs.store.Model(&cursorRecord{}).Where("key = ?", "firehose").Update("val", cs.firehoseSeq).Error; err != nil {
+
return fmt.Errorf("error updating cursor record: %w", err)
}
+
cs.firehoseLk.Unlock()
-
if err := flusher(&cs.reposLk, "repos", cs.reposSeq); err != nil {
-
return err
+
cs.reposLk.Lock()
+
if err := cs.store.Model(&cursorRecord{}).Where("key = ?", "repos").Update("val", cs.reposSeq).Error; err != nil {
+
return fmt.Errorf("error updating cursor record: %w", err)
}
+
cs.reposLk.Unlock()
return nil
}
+25 -22
cmd/backfiller/handlers.go
···
package main
import (
-
"bytes"
"context"
"fmt"
-
"strings"
-
appbsky "github.com/bluesky-social/indigo/api/bsky"
+
"github.com/bluesky-social/indigo/atproto/syntax"
+
"github.com/ipfs/go-cid"
"gorm.io/gorm"
)
···
}
func NewHandlerService(store *gorm.DB) *HandlerService {
+
store.AutoMigrate(&Account{})
store.AutoMigrate(&Profile{})
-
store.AutoMigrate(&Feedgen{})
-
// TODO the rest
+
store.AutoMigrate(&List{})
+
store.AutoMigrate(&Labeler{})
+
store.AutoMigrate(&FeedGenerator{})
+
store.AutoMigrate(&Post{})
+
store.AutoMigrate(&Like{})
+
store.AutoMigrate(&StarterPack{})
+
store.AutoMigrate(&Verification{})
+
store.AutoMigrate(&Lexicon{})
return &HandlerService{
store: store,
}
}
-
func (hs *HandlerService) HandleCreate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
-
if !strings.HasPrefix(path, "app.bsky.feed.generator/") {
-
return nil
-
}
-
-
var out appbsky.FeedGenerator
-
if err := out.UnmarshalCBOR(bytes.NewReader(*rec)); err != nil {
-
return fmt.Errorf("error unmarshalling record: %w", err)
+
// handles both creates and updates
+
func (hs *HandlerService) HandleUpsert(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
+
uri, err := syntax.ParseATURI(fmt.Sprintf("at://%s/%s", repo, path))
+
if err != nil {
+
return err
}
-
feedgen := Feedgen{
-
DisplayName: out.DisplayName,
-
}
+
switch uri.Collection() {
+
case syntax.NSID("app.bsky.actor.profile"):
+
profile := NewProfile(*rec)
+
profile.AtUri = string(uri)
+
hs.store.Create(profile)
-
if err := hs.store.Create(&feedgen).Error; err != nil {
-
return fmt.Errorf("error saving feedgen: %w", err)
+
case syntax.NSID("app.bsky.feed.generator"):
+
feedgen := NewFeedGenerator(*rec)
+
feedgen.AtUri = string(uri)
+
hs.store.Create(feedgen)
}
return nil
}
-
func (hs *HandlerService) HandleUpdate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
-
return nil
-
}
-
func (hs *HandlerService) HandleDelete(ctx context.Context, repo string, rev string, path string) error {
return nil
}
-80
cmd/backfiller/models.go
···
-
package main
-
-
import (
-
"bytes"
-
"log/slog"
-
-
appbsky "github.com/bluesky-social/indigo/api/bsky"
-
)
-
-
type Account struct{}
-
-
// func NewAccount
-
-
type Profile struct {
-
ID uint `gorm:"primaryKey"`
-
AtUri string
-
DisplayName *string
-
}
-
-
func NewProfile(rec []byte) *Profile {
-
var out appbsky.ActorProfile
-
if err := out.UnmarshalCBOR(bytes.NewReader(rec)); err != nil {
-
slog.Error("could not unmarshal profile CBOR", "err", err)
-
return nil
-
}
-
return &Profile{
-
DisplayName: out.DisplayName,
-
}
-
}
-
-
type List struct{}
-
-
// func NewList
-
-
type Labeler struct{}
-
-
// func NewFeedGenerator
-
-
type FeedGenerator struct {
-
ID uint `gorm:"primaryKey"`
-
AtUri string
-
DisplayName string
-
}
-
-
func NewFeedGenerator(rec []byte) *FeedGenerator {
-
var out appbsky.FeedGenerator
-
if err := out.UnmarshalCBOR(bytes.NewReader(rec)); err != nil {
-
slog.Error("could not unmarshal feedgen CBOR", "err", err)
-
return nil
-
}
-
return &FeedGenerator{
-
DisplayName: out.DisplayName,
-
}
-
}
-
-
type Post struct {
-
ID uint `gorm:"primaryKey"`
-
AtUri string
-
Text string
-
}
-
-
func NewPost(rec []byte) *Post {
-
return nil
-
}
-
-
type Like struct{}
-
-
// func NewLike
-
-
type StarterPack struct{}
-
-
// func NewStarterPack
-
-
type Verification struct{}
-
-
// func NewVerification
-
-
type Lexicon struct{}
-
-
// func NewLexicon
+6
models/strongref.go
···
+
package models
+
+
type StrongRef struct {
+
Uri string
+
Cid string
+
}
+87
models/feed_threadgate.go
···
+
package models
+
+
import (
+
"bytes"
+
"log/slog"
+
"time"
+
+
appbsky "github.com/bluesky-social/indigo/api/bsky"
+
"github.com/bluesky-social/indigo/atproto/syntax"
+
)
+
+
type FeedThreadgate struct {
+
ID string `gorm:"primaryKey"`
+
+
AllowRules []FeedThreadgate_AllowRule
+
CreatedAt string
+
HiddenReplies []FeedThreadgate_HiddenReply
+
Post string
+
+
AutoCreatedAt time.Time `gorm:"autoCreateTime"`
+
AutoUpdatedAt time.Time `gorm:"autoUpdateTime"`
+
}
+
+
type FeedThreadgate_AllowRule struct {
+
FeedThreadgateID string
+
Rule string
+
}
+
+
type FeedThreadgate_HiddenReply struct {
+
FeedThreadgateID string
+
Uri string
+
}
+
+
func NewFeedThreadgate(uri syntax.ATURI, rec []byte) *FeedThreadgate {
+
var out appbsky.FeedThreadgate
+
if err := out.UnmarshalCBOR(bytes.NewReader(rec)); err != nil {
+
slog.Error("could not unmarshal feed threadgate CBOR", "err", err)
+
return nil
+
}
+
+
threadgate := FeedThreadgate{
+
ID: string(uri),
+
CreatedAt: out.CreatedAt,
+
Post: out.Post,
+
}
+
+
for _, hidden := range out.HiddenReplies {
+
threadgate.HiddenReplies = append(threadgate.HiddenReplies, FeedThreadgate_HiddenReply{
+
FeedThreadgateID: threadgate.ID,
+
Uri: hidden,
+
})
+
}
+
+
if out.Allow != nil {
+
for _, rule := range out.Allow {
+
if rule.FeedThreadgate_MentionRule != nil {
+
threadgate.AllowRules = append(threadgate.AllowRules, FeedThreadgate_AllowRule{
+
FeedThreadgateID: threadgate.ID,
+
Rule: rule.FeedThreadgate_MentionRule.LexiconTypeID,
+
})
+
}
+
+
if rule.FeedThreadgate_FollowerRule != nil {
+
threadgate.AllowRules = append(threadgate.AllowRules, FeedThreadgate_AllowRule{
+
FeedThreadgateID: threadgate.ID,
+
Rule: rule.FeedThreadgate_FollowerRule.LexiconTypeID,
+
})
+
}
+
+
if rule.FeedThreadgate_FollowingRule != nil {
+
threadgate.AllowRules = append(threadgate.AllowRules, FeedThreadgate_AllowRule{
+
FeedThreadgateID: threadgate.ID,
+
Rule: rule.FeedThreadgate_FollowingRule.LexiconTypeID,
+
})
+
}
+
+
if rule.FeedThreadgate_ListRule != nil {
+
threadgate.AllowRules = append(threadgate.AllowRules, FeedThreadgate_AllowRule{
+
FeedThreadgateID: threadgate.ID,
+
Rule: rule.FeedThreadgate_ListRule.LexiconTypeID,
+
})
+
}
+
}
+
}
+
+
return &threadgate
+
}
-20
models/models.go
···
-
package models
-
-
// - [X] ActorProfile *
-
// - [X] ActorStatus *
-
// - [X] FeedGenerator *
-
// - [X] FeedLike
-
// - [X] FeedPost
-
// - [X] FeedPostgate
-
// - [X] FeedRepost
-
// - [X] FeedThreadgate
-
// - [X] GraphBlock *
-
// - [X] GraphFollow
-
// - [X] GraphList *
-
// - [X] GraphListblock *
-
// - [X] GraphListitem *
-
// - [X] GraphStarterpack *
-
// - [X] GraphVerification *
-
// - [X] LabelerService *
-
// - [X] ActorDeclaration
-
// - [ ] LexiconSchema *
+10 -26
models/actor_profile.go
···
CreatedAt *string
Description *string
DisplayName *string
-
JoinedViaStarterPack []ActorProfile_JoinedViaStarterPack
+
JoinedViaStarterPack *StrongRef `gorm:"embedded;embeddedPrefix:starterpack_"`
Labels []ActorProfile_Label
-
PinnedPost []ActorProfile_PinnedPost
+
PinnedPost *StrongRef `gorm:"embedded;embeddedPrefix:pinnedpost_"`
AutoCreatedAt time.Time `gorm:"autoCreateTime"`
AutoUpdatedAt time.Time `gorm:"autoUpdateTime"`
···
Value string
}
-
type ActorProfile_JoinedViaStarterPack struct {
-
ActorProfileID string
-
StrongRef
-
}
-
-
type ActorProfile_PinnedPost struct {
-
ActorProfileID string
-
StrongRef
-
}
-
func NewActorProfile(uri syntax.ATURI, rec []byte) *ActorProfile {
var out appbsky.ActorProfile
if err := out.UnmarshalCBOR(bytes.NewReader(rec)); err != nil {
···
}
if out.JoinedViaStarterPack != nil {
-
profile.JoinedViaStarterPack = append(profile.JoinedViaStarterPack, ActorProfile_JoinedViaStarterPack{
-
ActorProfileID: profile.ID,
-
StrongRef: StrongRef{
-
Uri: out.JoinedViaStarterPack.Uri,
-
Cid: out.JoinedViaStarterPack.Cid,
-
},
-
})
+
profile.JoinedViaStarterPack = &StrongRef{
+
Uri: out.JoinedViaStarterPack.Uri,
+
Cid: out.JoinedViaStarterPack.Cid,
+
}
}
if out.PinnedPost != nil {
-
profile.PinnedPost = append(profile.PinnedPost, ActorProfile_PinnedPost{
-
ActorProfileID: profile.ID,
-
StrongRef: StrongRef{
-
Uri: out.PinnedPost.Uri,
-
Cid: out.PinnedPost.Cid,
-
},
-
})
+
profile.PinnedPost = &StrongRef{
+
Uri: out.PinnedPost.Uri,
+
Cid: out.PinnedPost.Cid,
+
}
}
if out.Labels != nil && out.Labels.LabelDefs_SelfLabels != nil && out.Labels.LabelDefs_SelfLabels.Values != nil {
+8 -24
models/feed_like.go
···
ID string `gorm:"primaryKey"`
CreatedAt string
-
Subject FeedLike_Subject
-
Via FeedLike_Via
+
Subject *StrongRef `gorm:"embedded;embeddedPrefix:subject_"`
+
Via *StrongRef `gorm:"embedded;embeddedPrefix:via_"`
AutoCreatedAt time.Time `gorm:"autoCreateTime"`
AutoUpdatedAt time.Time `gorm:"autoUpdateTime"`
}
-
type FeedLike_Subject struct {
-
FeedLikeID string
-
StrongRef
-
}
-
-
type FeedLike_Via struct {
-
FeedLikeID string
-
StrongRef
-
}
-
func NewFeedLike(uri syntax.ATURI, rec []byte) *FeedLike {
var out appbsky.FeedLike
if err := out.UnmarshalCBOR(bytes.NewReader(rec)); err != nil {
···
}
if out.Subject != nil {
-
like.Subject = FeedLike_Subject{
-
FeedLikeID: like.ID,
-
StrongRef: StrongRef{
-
Uri: out.Subject.Uri,
-
Cid: out.Subject.Cid,
-
},
+
like.Subject = &StrongRef{
+
Uri: out.Subject.Uri,
+
Cid: out.Subject.Cid,
}
}
if out.Via != nil {
-
like.Via = FeedLike_Via{
-
FeedLikeID: like.ID,
-
StrongRef: StrongRef{
-
Uri: out.Via.Uri,
-
Cid: out.Via.Cid,
-
},
+
like.Via = &StrongRef{
+
Uri: out.Via.Uri,
+
Cid: out.Via.Cid,
}
}
+2 -2
models/feed_postgate.go
···
type FeedPostgate_EmbeddingRule struct {
FeedPostgateID string
-
DisableRule string
+
Rule string
}
func NewFeedPostgate(uri syntax.ATURI, rec []byte) *FeedPostgate {
···
if rule.FeedPostgate_DisableRule != nil {
postgate.EmbeddingRules = append(postgate.EmbeddingRules, FeedPostgate_EmbeddingRule{
FeedPostgateID: postgate.ID,
-
DisableRule: rule.FeedPostgate_DisableRule.LexiconTypeID,
+
Rule: rule.FeedPostgate_DisableRule.LexiconTypeID,
})
}
}
+1
models/graph_follow.go
···
}
return &GraphFollow{
+
ID: string(uri),
CreatedAt: out.CreatedAt,
Subject: out.Subject,
}
+1
models/graph_list.go
···
}
list := GraphList{
+
ID: string(uri),
CreatedAt: out.CreatedAt,
Description: out.Description,
Name: out.Name,
+1 -1
cmd/monarch/main.go
···
},
&cli.IntFlag{
Name: "sync-requests-limit",
-
Value: 30,
+
Value: 10, // ratelimit-policy: 3000;w=300
},
}
+4 -20
cmd/monarch/census.go
···
)
type CensusService struct {
-
cursor *CursorService
-
backfill *backfill.Backfiller
-
+
cursor *CursorService
+
backfill *backfill.Backfiller
seenHosts map[string]bool
-
seenLk sync.Mutex
-
-
storeLk sync.Mutex
}
type jobMaker interface {
···
for _, host := range res.Hosts {
// don't reprocess hosts already handled
-
cs.seenLk.Lock()
-
_, ok := cs.seenHosts[host.Hostname]
-
cs.seenLk.Unlock()
-
if ok {
+
seen := cs.seenHosts[host.Hostname]
+
if seen {
slog.Info("already processed host, skipping", "host", host)
continue
}
···
return
}
-
cs.storeLk.Lock()
hcur, err := cs.cursor.GetHostCursor(host)
if err != nil {
slog.Error("error fetching host cursor", "err", err)
}
-
cs.storeLk.Unlock()
var added int
curs := hcur.Cursor
···
continue
}
-
cs.storeLk.Lock()
for _, repo := range res.Repos {
_, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
if err != nil {
···
added += 1
}
}
-
cs.storeLk.Unlock()
if res.Cursor != nil && *res.Cursor != "" {
curs = *res.Cursor
-
cs.storeLk.Lock()
if err := cs.cursor.SetHostCursor(host, curs); err != nil {
slog.Error("error updating cursor for host", "err", err)
}
-
cs.storeLk.Unlock()
} else {
break
}
}
slog.Info("finished listing repos", "host", host)
-
-
cs.seenLk.Lock()
-
defer cs.seenLk.Unlock()
-
cs.seenHosts[host] = true
}
+1 -4
cmd/monarch/cursors.go
···
import (
"context"
"log/slog"
-
"sync"
"time"
"gorm.io/gorm"
)
type CursorService struct {
-
store *gorm.DB
-
-
firehoseLk sync.Mutex
+
store *gorm.DB
firehoseSeq int64
}
+3 -1
cmd/monarch/firehose.go
···
func NewFirehoseConnection(ctx context.Context, cctx *cli.Context, cursorSvc *CursorService) (*websocket.Conn, error) {
url := fmt.Sprintf("wss://%s/xrpc/com.atproto.sync.subscribeRepos", cctx.String("relay-host"))
curs, err := cursorSvc.GetFirehoseCursor()
-
if err == nil { // reversed
+
if err != nil {
+
slog.Error("error getting firehose cursor", "err", err)
+
} else if curs > 0 {
url += fmt.Sprintf("?cursor=%d", curs)
}