an app.bsky.* indexer

WIP of why subject/via are inconsistently added

Changed files
+38 -7
cmd
models
+2 -1
cmd/monarch/census.go
···
return
}
+
jmstore.GetOrCreateJob(ctx, "did:plc:4nsduwlpivpuur4mqkbfvm6a", backfill.StateEnqueued)
+
curs, _ := cs.cursor.Get("repos")
for {
select {
case <-ctx.Done():
slog.Info("stopping repo census")
return
-
default:
}
res, err := atproto.SyncListRepos(ctx, xrpcc, curs, 1000)
+16 -3
cmd/monarch/handlers.go
···
import (
"context"
"fmt"
+
"log/slog"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/ipfs/go-cid"
···
}
case syntax.NSID("app.bsky.feed.repost"):
-
repost := models.NewFeedRepost(uri, *rec)
-
if err := hs.store.Where(models.FeedRepost{ID: string(uri)}).Assign(repost).FirstOrCreate(&models.FeedRepost{}).Error; err != nil {
-
return fmt.Errorf("error upserting feed repost: %w", err)
+
if uri.RecordKey() == "3lvhrfzuzb32k" || uri.RecordKey() == "3lvcmas2cw32k" {
+
slog.Info("got repost", "uri", uri)
+
} else {
+
return nil
}
+
+
db := hs.store.Session(&gorm.Session{DryRun: true})
+
repost := models.NewFeedRepost(uri, *rec)
+
// stmt := db.Where(models.FeedRepost{ID: string(uri)}).Assign(repost).FirstOrCreate(&models.FeedRepost{}).Statement
+
stmt := db.Create(repost).Statement
+
+
slog.Info("repost sql statement", "sql", stmt.SQL.String(), "vars", stmt.Vars)
+
+
// if err := hs.store.Where(models.FeedRepost{ID: string(uri)}).Assign(repost).FirstOrCreate(&models.FeedRepost{}).Error; err != nil {
+
// return fmt.Errorf("error upserting feed repost: %w", err)
+
// }
case syntax.NSID("app.bsky.feed.threadgate"):
threadgate := models.NewFeedThreadgate(uri, *rec)
+2 -1
cmd/monarch/main.go
···
rsc := events.RepoStreamCallbacks{
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
app.cursor.SetFirehoseCursor(evt.Seq)
-
return app.backfill.HandleEvent(ctx, evt)
+
//return app.backfill.HandleEvent(ctx, evt)
+
return nil
},
// TODO account
// TODO identity
+18 -2
models/feed_repost.go
···
ID string `gorm:"primaryKey"`
CreatedAt string
-
Subject *StrongRef `gorm:"embedded;embeddedPrefix:subject_"`
-
Via *StrongRef `gorm:"embedded;embeddedPrefix:via_"`
+
+
Subject *StrongRef `gorm:"embedded;embeddedPrefix:subject_"`
+
Via *StrongRef `gorm:"embedded;embeddedPrefix:via_"`
+
+
// SubjectUri string
+
// ViaUri string
AutoCreatedAt time.Time `gorm:"autoCreateTime"`
AutoUpdatedAt time.Time `gorm:"autoUpdateTime"`
···
slog.Error("could not unmarshal feed repost CBOR", "err", err)
return nil
}
+
+
slog.Info("repost debug", "out", out)
repost := FeedRepost{
ID: string(uri),
···
}
if out.Subject != nil {
+
slog.Info("out.Subject is not nil")
repost.Subject = &StrongRef{
Uri: out.Subject.Uri,
Cid: out.Subject.Cid,
}
+
// repost.SubjectUri = out.Subject.Uri
+
} else {
+
slog.Info("out.Subject is nil")
}
if out.Via != nil {
+
slog.Info("out.Via is not nil")
repost.Via = &StrongRef{
Uri: out.Via.Uri,
Cid: out.Via.Cid,
}
+
// repost.ViaUri = out.Via.Uri
+
} else {
+
slog.Info("out.Via is nil")
}
+
+
slog.Info("final repost", "repost", repost)
return &repost
}