an app.bsky.* indexer

Compare changes

Choose any two refs to compare.

+6
models/strongref.go
···
+
package models
+
+
type StrongRef struct {
+
Uri string
+
Cid string
+
}
+87
models/feed_threadgate.go
···
+
package models
+
+
import (
+
"bytes"
+
"log/slog"
+
"time"
+
+
appbsky "github.com/bluesky-social/indigo/api/bsky"
+
"github.com/bluesky-social/indigo/atproto/syntax"
+
)
+
+
type FeedThreadgate struct {
+
ID string `gorm:"primaryKey"`
+
+
AllowRules []FeedThreadgate_AllowRule
+
CreatedAt string
+
HiddenReplies []FeedThreadgate_HiddenReply
+
Post string
+
+
AutoCreatedAt time.Time `gorm:"autoCreateTime"`
+
AutoUpdatedAt time.Time `gorm:"autoUpdateTime"`
+
}
+
+
type FeedThreadgate_AllowRule struct {
+
FeedThreadgateID string
+
Rule string
+
}
+
+
type FeedThreadgate_HiddenReply struct {
+
FeedThreadgateID string
+
Uri string
+
}
+
+
func NewFeedThreadgate(uri syntax.ATURI, rec []byte) *FeedThreadgate {
+
var out appbsky.FeedThreadgate
+
if err := out.UnmarshalCBOR(bytes.NewReader(rec)); err != nil {
+
slog.Error("could not unmarshal feed threadgate CBOR", "err", err)
+
return nil
+
}
+
+
threadgate := FeedThreadgate{
+
ID: string(uri),
+
CreatedAt: out.CreatedAt,
+
Post: out.Post,
+
}
+
+
for _, hidden := range out.HiddenReplies {
+
threadgate.HiddenReplies = append(threadgate.HiddenReplies, FeedThreadgate_HiddenReply{
+
FeedThreadgateID: threadgate.ID,
+
Uri: hidden,
+
})
+
}
+
+
if out.Allow != nil {
+
for _, rule := range out.Allow {
+
if rule.FeedThreadgate_MentionRule != nil {
+
threadgate.AllowRules = append(threadgate.AllowRules, FeedThreadgate_AllowRule{
+
FeedThreadgateID: threadgate.ID,
+
Rule: rule.FeedThreadgate_MentionRule.LexiconTypeID,
+
})
+
}
+
+
if rule.FeedThreadgate_FollowerRule != nil {
+
threadgate.AllowRules = append(threadgate.AllowRules, FeedThreadgate_AllowRule{
+
FeedThreadgateID: threadgate.ID,
+
Rule: rule.FeedThreadgate_FollowerRule.LexiconTypeID,
+
})
+
}
+
+
if rule.FeedThreadgate_FollowingRule != nil {
+
threadgate.AllowRules = append(threadgate.AllowRules, FeedThreadgate_AllowRule{
+
FeedThreadgateID: threadgate.ID,
+
Rule: rule.FeedThreadgate_FollowingRule.LexiconTypeID,
+
})
+
}
+
+
if rule.FeedThreadgate_ListRule != nil {
+
threadgate.AllowRules = append(threadgate.AllowRules, FeedThreadgate_AllowRule{
+
FeedThreadgateID: threadgate.ID,
+
Rule: rule.FeedThreadgate_ListRule.LexiconTypeID,
+
})
+
}
+
}
+
}
+
+
return &threadgate
+
}
+10 -26
models/actor_profile.go
···
CreatedAt *string
Description *string
DisplayName *string
-
JoinedViaStarterPack []ActorProfile_JoinedViaStarterPack
+
JoinedViaStarterPack *StrongRef `gorm:"embedded;embeddedPrefix:starterpack_"`
Labels []ActorProfile_Label
-
PinnedPost []ActorProfile_PinnedPost
+
PinnedPost *StrongRef `gorm:"embedded;embeddedPrefix:pinnedpost_"`
AutoCreatedAt time.Time `gorm:"autoCreateTime"`
AutoUpdatedAt time.Time `gorm:"autoUpdateTime"`
···
Value string
}
-
type ActorProfile_JoinedViaStarterPack struct {
-
ActorProfileID string
-
StrongRef
-
}
-
-
type ActorProfile_PinnedPost struct {
-
ActorProfileID string
-
StrongRef
-
}
-
func NewActorProfile(uri syntax.ATURI, rec []byte) *ActorProfile {
var out appbsky.ActorProfile
if err := out.UnmarshalCBOR(bytes.NewReader(rec)); err != nil {
···
}
if out.JoinedViaStarterPack != nil {
-
profile.JoinedViaStarterPack = append(profile.JoinedViaStarterPack, ActorProfile_JoinedViaStarterPack{
-
ActorProfileID: profile.ID,
-
StrongRef: StrongRef{
-
Uri: out.JoinedViaStarterPack.Uri,
-
Cid: out.JoinedViaStarterPack.Cid,
-
},
-
})
+
profile.JoinedViaStarterPack = &StrongRef{
+
Uri: out.JoinedViaStarterPack.Uri,
+
Cid: out.JoinedViaStarterPack.Cid,
+
}
}
if out.PinnedPost != nil {
-
profile.PinnedPost = append(profile.PinnedPost, ActorProfile_PinnedPost{
-
ActorProfileID: profile.ID,
-
StrongRef: StrongRef{
-
Uri: out.PinnedPost.Uri,
-
Cid: out.PinnedPost.Cid,
-
},
-
})
+
profile.PinnedPost = &StrongRef{
+
Uri: out.PinnedPost.Uri,
+
Cid: out.PinnedPost.Cid,
+
}
}
if out.Labels != nil && out.Labels.LabelDefs_SelfLabels != nil && out.Labels.LabelDefs_SelfLabels.Values != nil {
+8 -24
models/feed_like.go
···
ID string `gorm:"primaryKey"`
CreatedAt string
-
Subject FeedLike_Subject
-
Via FeedLike_Via
+
Subject *StrongRef `gorm:"embedded;embeddedPrefix:subject_"`
+
Via *StrongRef `gorm:"embedded;embeddedPrefix:via_"`
AutoCreatedAt time.Time `gorm:"autoCreateTime"`
AutoUpdatedAt time.Time `gorm:"autoUpdateTime"`
}
-
type FeedLike_Subject struct {
-
FeedLikeID string
-
StrongRef
-
}
-
-
type FeedLike_Via struct {
-
FeedLikeID string
-
StrongRef
-
}
-
func NewFeedLike(uri syntax.ATURI, rec []byte) *FeedLike {
var out appbsky.FeedLike
if err := out.UnmarshalCBOR(bytes.NewReader(rec)); err != nil {
···
}
if out.Subject != nil {
-
like.Subject = FeedLike_Subject{
-
FeedLikeID: like.ID,
-
StrongRef: StrongRef{
-
Uri: out.Subject.Uri,
-
Cid: out.Subject.Cid,
-
},
+
like.Subject = &StrongRef{
+
Uri: out.Subject.Uri,
+
Cid: out.Subject.Cid,
}
}
if out.Via != nil {
-
like.Via = FeedLike_Via{
-
FeedLikeID: like.ID,
-
StrongRef: StrongRef{
-
Uri: out.Via.Uri,
-
Cid: out.Via.Cid,
-
},
+
like.Via = &StrongRef{
+
Uri: out.Via.Uri,
+
Cid: out.Via.Cid,
}
}
+2 -2
models/feed_postgate.go
···
type FeedPostgate_EmbeddingRule struct {
FeedPostgateID string
-
DisableRule string
+
Rule string
}
func NewFeedPostgate(uri syntax.ATURI, rec []byte) *FeedPostgate {
···
if rule.FeedPostgate_DisableRule != nil {
postgate.EmbeddingRules = append(postgate.EmbeddingRules, FeedPostgate_EmbeddingRule{
FeedPostgateID: postgate.ID,
-
DisableRule: rule.FeedPostgate_DisableRule.LexiconTypeID,
+
Rule: rule.FeedPostgate_DisableRule.LexiconTypeID,
})
}
}
+1
models/graph_follow.go
···
}
return &GraphFollow{
+
ID: string(uri),
CreatedAt: out.CreatedAt,
Subject: out.Subject,
}
+1
models/graph_list.go
···
}
list := GraphList{
+
ID: string(uri),
CreatedAt: out.CreatedAt,
Description: out.Description,
Name: out.Name,
-31
cmd/monarch/database.go
···
-
package main
-
-
import (
-
"log"
-
"log/slog"
-
"os"
-
"time"
-
-
"gorm.io/driver/sqlite"
-
"gorm.io/gorm"
-
"gorm.io/gorm/logger"
-
)
-
-
func NewDatabase(path string) *gorm.DB {
-
sl := slog.With("source", "database")
-
l := logger.New(
-
log.New(os.Stdout, "\r\n", log.LstdFlags),
-
logger.Config{
-
SlowThreshold: time.Second,
-
Colorful: false,
-
},
-
)
-
db, err := gorm.Open(sqlite.Open(path), &gorm.Config{
-
Logger: l,
-
})
-
if err != nil {
-
sl.Error("failed to open database", "err", err)
-
}
-
db.Exec("PRAGMA journal_mode=WAL")
-
return db
-
}
+1 -1
go.mod
···
github.com/gorilla/websocket v1.5.1
github.com/ipfs/go-cid v0.4.1
github.com/urfave/cli/v2 v2.25.7
+
golang.org/x/sync v0.7.0
gorm.io/gorm v1.25.9
)
···
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/net v0.23.0 // indirect
-
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
+20
cmd/monarch/handlers.go
···
"context"
"encoding/json"
"fmt"
+
"log/slog"
appbsky "github.com/bluesky-social/indigo/api/bsky"
"github.com/bluesky-social/indigo/atproto/syntax"
···
var out appbsky.LabelerService
out.UnmarshalCBOR(bytes.NewReader(*rec))
body, err = json.Marshal(out)
+
+
case syntax.NSID("app.bsky.graph.list"):
+
var out appbsky.GraphList
+
out.UnmarshalCBOR(bytes.NewReader(*rec))
+
body, err = json.Marshal(out)
+
+
case syntax.NSID("app.bsky.graph.verification"):
+
var out appbsky.GraphVerification
+
out.UnmarshalCBOR(bytes.NewReader(*rec))
+
body, err = json.Marshal(out)
+
+
case syntax.NSID("app.bsky.graph.starterpack"):
+
var out appbsky.GraphStarterpack
+
out.UnmarshalCBOR(bytes.NewReader(*rec))
+
body, err = json.Marshal(out)
+
+
default:
+
slog.Error("tracked collection missing handler", "collection", uri.Collection())
+
return nil
}
switch action {
+1 -1
cmd/monarch/backfill.go
···
ParallelBackfills: cctx.Int("backfill-workers"),
ParallelRecordCreates: cctx.Int("backfill-consumers"),
NSIDFilter: "",
-
SyncRequestsPerSecond: 10,
+
SyncRequestsPerSecond: cctx.Int("sync-requests-limit"),
RelayHost: "https://" + cctx.String("relay-host"),
}