an app.bsky.* indexer

Compare changes

Choose any two refs to compare.

+10 -26
models/actor_profile.go
···
CreatedAt *string
Description *string
DisplayName *string
-
JoinedViaStarterPack []ActorProfile_JoinedViaStarterPack
+
JoinedViaStarterPack *StrongRef `gorm:"embedded;embeddedPrefix:starterpack_"`
Labels []ActorProfile_Label
-
PinnedPost []ActorProfile_PinnedPost
+
PinnedPost *StrongRef `gorm:"embedded;embeddedPrefix:pinnedpost_"`
AutoCreatedAt time.Time `gorm:"autoCreateTime"`
AutoUpdatedAt time.Time `gorm:"autoUpdateTime"`
···
Value string
}
-
type ActorProfile_JoinedViaStarterPack struct {
-
ActorProfileID string
-
StrongRef
-
}
-
-
type ActorProfile_PinnedPost struct {
-
ActorProfileID string
-
StrongRef
-
}
-
func NewActorProfile(uri syntax.ATURI, rec []byte) *ActorProfile {
var out appbsky.ActorProfile
if err := out.UnmarshalCBOR(bytes.NewReader(rec)); err != nil {
···
}
if out.JoinedViaStarterPack != nil {
-
profile.JoinedViaStarterPack = append(profile.JoinedViaStarterPack, ActorProfile_JoinedViaStarterPack{
-
ActorProfileID: profile.ID,
-
StrongRef: StrongRef{
-
Uri: out.JoinedViaStarterPack.Uri,
-
Cid: out.JoinedViaStarterPack.Cid,
-
},
-
})
+
profile.JoinedViaStarterPack = &StrongRef{
+
Uri: out.JoinedViaStarterPack.Uri,
+
Cid: out.JoinedViaStarterPack.Cid,
+
}
}
if out.PinnedPost != nil {
-
profile.PinnedPost = append(profile.PinnedPost, ActorProfile_PinnedPost{
-
ActorProfileID: profile.ID,
-
StrongRef: StrongRef{
-
Uri: out.PinnedPost.Uri,
-
Cid: out.PinnedPost.Cid,
-
},
-
})
+
profile.PinnedPost = &StrongRef{
+
Uri: out.PinnedPost.Uri,
+
Cid: out.PinnedPost.Cid,
+
}
}
if out.Labels != nil && out.Labels.LabelDefs_SelfLabels != nil && out.Labels.LabelDefs_SelfLabels.Values != nil {
+8 -24
models/feed_like.go
···
ID string `gorm:"primaryKey"`
CreatedAt string
-
Subject FeedLike_Subject
-
Via FeedLike_Via
+
Subject *StrongRef `gorm:"embedded;embeddedPrefix:subject_"`
+
Via *StrongRef `gorm:"embedded;embeddedPrefix:via_"`
AutoCreatedAt time.Time `gorm:"autoCreateTime"`
AutoUpdatedAt time.Time `gorm:"autoUpdateTime"`
}
-
type FeedLike_Subject struct {
-
FeedLikeID string
-
StrongRef
-
}
-
-
type FeedLike_Via struct {
-
FeedLikeID string
-
StrongRef
-
}
-
func NewFeedLike(uri syntax.ATURI, rec []byte) *FeedLike {
var out appbsky.FeedLike
if err := out.UnmarshalCBOR(bytes.NewReader(rec)); err != nil {
···
}
if out.Subject != nil {
-
like.Subject = FeedLike_Subject{
-
FeedLikeID: like.ID,
-
StrongRef: StrongRef{
-
Uri: out.Subject.Uri,
-
Cid: out.Subject.Cid,
-
},
+
like.Subject = &StrongRef{
+
Uri: out.Subject.Uri,
+
Cid: out.Subject.Cid,
}
}
if out.Via != nil {
-
like.Via = FeedLike_Via{
-
FeedLikeID: like.ID,
-
StrongRef: StrongRef{
-
Uri: out.Via.Uri,
-
Cid: out.Via.Cid,
-
},
+
like.Via = &StrongRef{
+
Uri: out.Via.Uri,
+
Cid: out.Via.Cid,
}
}
+6 -8
models/feed_post.go
···
CreatedAt string
Labels []FeedPost_Label
Langs []FeedPost_Lang
-
Reply FeedPost_Reply
+
Reply *FeedPost_Reply `gorm:"embedded"`
Tags []FeedPost_Tag
Text string
···
}
type FeedPost_Reply struct {
-
FeedPostID string
-
Parent StrongRef `gorm:"embedded;embeddedPrefix:parent_"`
-
Root StrongRef `gorm:"embedded;embeddedPrefix:root_"`
+
Root *StrongRef `gorm:"embedded;embeddedPrefix:reply_root_"`
+
Parent *StrongRef `gorm:"embedded;embeddedPrefix:reply_parent_"`
}
type FeedPost_Tag struct {
···
}
if out.Reply != nil {
-
post.Reply = FeedPost_Reply{
-
FeedPostID: post.ID,
-
Parent: StrongRef{
+
post.Reply = &FeedPost_Reply{
+
Parent: &StrongRef{
Uri: out.Reply.Parent.Uri,
Cid: out.Reply.Parent.Cid,
},
-
Root: StrongRef{
+
Root: &StrongRef{
Uri: out.Reply.Root.Uri,
Cid: out.Reply.Root.Cid,
},
+2 -18
models/feed_repost.go
···
ID string `gorm:"primaryKey"`
CreatedAt string
-
-
Subject *StrongRef `gorm:"embedded;embeddedPrefix:subject_"`
-
Via *StrongRef `gorm:"embedded;embeddedPrefix:via_"`
-
-
// SubjectUri string
-
// ViaUri string
+
Subject *StrongRef `gorm:"embedded;embeddedPrefix:subject_"`
+
Via *StrongRef `gorm:"embedded;embeddedPrefix:via_"`
AutoCreatedAt time.Time `gorm:"autoCreateTime"`
AutoUpdatedAt time.Time `gorm:"autoUpdateTime"`
···
return nil
}
-
slog.Info("repost debug", "out", out)
-
repost := FeedRepost{
ID: string(uri),
CreatedAt: out.CreatedAt,
}
if out.Subject != nil {
-
slog.Info("out.Subject is not nil")
repost.Subject = &StrongRef{
Uri: out.Subject.Uri,
Cid: out.Subject.Cid,
}
-
// repost.SubjectUri = out.Subject.Uri
-
} else {
-
slog.Info("out.Subject is nil")
}
if out.Via != nil {
-
slog.Info("out.Via is not nil")
repost.Via = &StrongRef{
Uri: out.Via.Uri,
Cid: out.Via.Cid,
}
-
// repost.ViaUri = out.Via.Uri
-
} else {
-
slog.Info("out.Via is nil")
}
-
slog.Info("final repost", "repost", repost)
-
return &repost
}
+1
models/actor_status.go
···
}
status := ActorStatus{
+
ID: string(uri),
CreatedAt: out.CreatedAt,
DurationMinutes: out.DurationMinutes,
Status: out.Status,
+1
models/feed_generator.go
···
}
feedgen := FeedGenerator{
+
ID: string(uri),
AcceptsInteractions: out.AcceptsInteractions,
ContentMode: out.ContentMode,
CreatedAt: out.CreatedAt,
+1
models/graph_block.go
···
}
return &GraphBlock{
+
ID: string(uri),
CreatedAt: out.CreatedAt,
Subject: out.Subject,
}
+1
models/graph_list.go
···
}
list := GraphList{
+
ID: string(uri),
CreatedAt: out.CreatedAt,
Description: out.Description,
Name: out.Name,
+1
models/graph_listblock.go
···
}
return &GraphListblock{
+
ID: string(uri),
CreatedAt: out.CreatedAt,
Subject: out.Subject,
}
+1
models/graph_listitem.go
···
}
return &GraphListitem{
+
ID: string(uri),
CreatedAt: out.CreatedAt,
List: out.List,
Subject: out.Subject,
+1
models/graph_starterpack.go
···
}
pack := GraphStarterpack{
+
ID: string(uri),
CreatedAt: out.CreatedAt,
Description: out.Description,
List: out.List,
+1
models/graph_verification.go
···
}
verification := GraphVerification{
+
ID: string(uri),
CreatedAt: out.CreatedAt,
DisplayName: out.DisplayName,
Handle: out.Handle,
+1
models/labeler_service.go
···
}
labeler := LabelerService{
+
ID: string(uri),
CreatedAt: out.CreatedAt,
}
-31
cmd/monarch/database.go
···
-
package main
-
-
import (
-
"log"
-
"log/slog"
-
"os"
-
"time"
-
-
"gorm.io/driver/sqlite"
-
"gorm.io/gorm"
-
"gorm.io/gorm/logger"
-
)
-
-
func NewDatabase(path string) *gorm.DB {
-
sl := slog.With("source", "database")
-
l := logger.New(
-
log.New(os.Stdout, "\r\n", log.LstdFlags),
-
logger.Config{
-
SlowThreshold: time.Second,
-
Colorful: false,
-
},
-
)
-
db, err := gorm.Open(sqlite.Open(path), &gorm.Config{
-
Logger: l,
-
})
-
if err != nil {
-
sl.Error("failed to open database", "err", err)
-
}
-
db.Exec("PRAGMA journal_mode=WAL")
-
return db
-
}
+1 -1
cmd/monarch/main.go
···
},
&cli.IntFlag{
Name: "sync-requests-limit",
-
Value: 30,
+
Value: 10, // ratelimit-policy: 3000;w=300
},
}
+4 -20
cmd/monarch/census.go
···
)
type CensusService struct {
-
cursor *CursorService
-
backfill *backfill.Backfiller
-
+
cursor *CursorService
+
backfill *backfill.Backfiller
seenHosts map[string]bool
-
seenLk sync.Mutex
-
-
storeLk sync.Mutex
}
type jobMaker interface {
···
for _, host := range res.Hosts {
// don't reprocess hosts already handled
-
cs.seenLk.Lock()
-
_, ok := cs.seenHosts[host.Hostname]
-
cs.seenLk.Unlock()
-
if ok {
+
seen := cs.seenHosts[host.Hostname]
+
if seen {
slog.Info("already processed host, skipping", "host", host)
continue
}
···
return
}
-
cs.storeLk.Lock()
hcur, err := cs.cursor.GetHostCursor(host)
if err != nil {
slog.Error("error fetching host cursor", "err", err)
}
-
cs.storeLk.Unlock()
var added int
curs := hcur.Cursor
···
continue
}
-
cs.storeLk.Lock()
for _, repo := range res.Repos {
_, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
if err != nil {
···
added += 1
}
}
-
cs.storeLk.Unlock()
if res.Cursor != nil && *res.Cursor != "" {
curs = *res.Cursor
-
cs.storeLk.Lock()
if err := cs.cursor.SetHostCursor(host, curs); err != nil {
slog.Error("error updating cursor for host", "err", err)
}
-
cs.storeLk.Unlock()
} else {
break
}
}
slog.Info("finished listing repos", "host", host)
-
-
cs.seenLk.Lock()
-
defer cs.seenLk.Unlock()
-
cs.seenHosts[host] = true
}
+1 -4
cmd/monarch/cursors.go
···
import (
"context"
"log/slog"
-
"sync"
"time"
"gorm.io/gorm"
)
type CursorService struct {
-
store *gorm.DB
-
-
firehoseLk sync.Mutex
+
store *gorm.DB
firehoseSeq int64
}