From 5e73c6561665e34f487d57073af8f320bd6a4902 Mon Sep 17 00:00:00 2001 From: Anirudh Oppiliappan Date: Sat, 14 Jun 2025 10:43:12 +0300 Subject: [PATCH] spindle: setup jetstream ingester Change-Id: ursoqorztvwwutylzyotooqsrrmzoxos Signed-off-by: Anirudh Oppiliappan --- spindle/config/config.go | 4 +- spindle/ingester.go | 87 ++++++++++++++++++++++++++++++++++++++ spindle/server.go | 90 +++++++++++++++++++++++++++++----------- 3 files changed, 156 insertions(+), 25 deletions(-) create mode 100644 spindle/ingester.go diff --git a/spindle/config/config.go b/spindle/config/config.go index 2532e34..bafa2a0 100644 --- a/spindle/config/config.go +++ b/spindle/config/config.go @@ -12,10 +12,12 @@ type Server struct { Hostname string `env:"HOSTNAME, required"` JetstreamEndpoint string `env:"JETSTREAM_ENDPOINT, default=wss://jetstream1.us-west.bsky.network/subscribe"` Dev bool `env:"DEV, default=false"` + Owner string `env:"OWNER, required"` } type Config struct { - Server Server `env:",prefix=SPINDLE_SERVER_"` + Server Server `env:",prefix=SPINDLE_SERVER_"` + Knots []string `env:"SPINDLE_SUBSCRIBED_KNOTS,required"` } func Load(ctx context.Context) (*Config, error) { diff --git a/spindle/ingester.go b/spindle/ingester.go new file mode 100644 index 0000000..59ecdf3 --- /dev/null +++ b/spindle/ingester.go @@ -0,0 +1,87 @@ +package spindle + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/bluesky-social/jetstream/pkg/models" + "tangled.sh/tangled.sh/core/api/tangled" +) + +type Ingester func(ctx context.Context, e *models.Event) error + +func (s *Spindle) ingest() Ingester { + return func(ctx context.Context, e *models.Event) error { + var err error + defer func() { + eventTime := e.TimeUS + lastTimeUs := eventTime + 1 + if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil { + err = fmt.Errorf("(deferred) failed to save last time us: %w", err) + } + }() + + if e.Kind != models.EventKindCommit { + return nil + } + + switch e.Commit.Collection { + case tangled.SpindleMemberNSID: + s.ingestMember(ctx, e) + } + + return err + } +} + +func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error { + did := e.Did + var err error + + l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID) + + switch e.Commit.Operation { + case models.CommitOperationCreate, models.CommitOperationUpdate: + raw := e.Commit.Record + record := tangled.SpindleMember{} + err = json.Unmarshal(raw, &record) + if err != nil { + l.Error("invalid record", "error", err) + return err + } + + domain := s.cfg.Server.Hostname + if s.cfg.Server.Dev { + domain = s.cfg.Server.ListenAddr + } + recordInstance := *record.Instance + + if recordInstance != domain { + l.Error("domain mismatch", "domain", recordInstance, "expected", domain) + return fmt.Errorf("domain mismatch: %s != %s", *record.Instance, domain) + } + + ok, err := s.e.E.Enforce(did, rbacDomain, rbacDomain, "server:invite") + if err != nil || !ok { + l.Error("failed to add member", "did", did) + return fmt.Errorf("failed to enforce permissions: %w", err) + } + + if err := s.e.AddMember(rbacDomain, record.Subject); err != nil { + l.Error("failed to add member", "error", err) + return fmt.Errorf("failed to add member: %w", err) + } + l.Info("added member from firehose", "member", record.Subject) + + if err := s.db.AddDid(did); err != nil { + l.Error("failed to add did", "error", err) + return fmt.Errorf("failed to add did: %w", err) + } + s.jc.AddDid(did) + + return nil + + } + return nil +} diff --git a/spindle/server.go b/spindle/server.go index 6a0d0ff..f6b351a 100644 --- a/spindle/server.go +++ b/spindle/server.go @@ -22,6 +22,10 @@ import ( "tangled.sh/tangled.sh/core/spindle/queue" ) +const ( + rbacDomain = "thisserver" +) + type Spindle struct { jc *jetstream.JetstreamClient db *db.DB @@ -30,9 +34,12 @@ type Spindle struct { n *notifier.Notifier eng *engine.Engine jq *queue.Queue + cfg *config.Config } func Run(ctx context.Context) error { + logger := log.FromContext(ctx) + cfg, err := config.Load(ctx) if err != nil { return fmt.Errorf("failed to load config: %w", err) @@ -47,16 +54,10 @@ func Run(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to setup rbac enforcer: %w", err) } - - logger := log.FromContext(ctx) - - collections := []string{tangled.SpindleMemberNSID} - jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, false) - if err != nil { - return fmt.Errorf("failed to setup jetstream client: %w", err) - } + e.E.EnableAutoSave(true) n := notifier.New() + eng, err := engine.New(ctx, d, &n) if err != nil { return err @@ -64,9 +65,11 @@ func Run(ctx context.Context) error { jq := queue.NewQueue(100, 2) - // starts a job queue runner in the background - jq.Start() - defer jq.Stop() + collections := []string{tangled.SpindleMemberNSID} + jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, false, false) + if err != nil { + return fmt.Errorf("failed to setup jetstream client: %w", err) + } spindle := Spindle{ jc: jc, @@ -76,28 +79,50 @@ func Run(ctx context.Context) error { n: &n, eng: eng, jq: jq, + cfg: cfg, } - // for each incoming sh.tangled.pipeline, we execute - // spindle.processPipeline, which in turn enqueues the pipeline - // job in the above registered queue. + err = e.AddDomain(rbacDomain) + if err != nil { + return fmt.Errorf("failed to set rbac domain: %w", err) + } + err = spindle.configureOwner() + if err != nil { + return err + } + logger.Info("owner set", "did", cfg.Server.Owner) + + // starts a job queue runner in the background + jq.Start() + defer jq.Stop() + cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) if err != nil { return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) } - go func() { - logger.Info("starting event consumer") - knotEventSource := knotclient.NewEventSource("localhost:6000") - ccfg := knotclient.NewConsumerConfig() - ccfg.Logger = logger - ccfg.Dev = cfg.Server.Dev - ccfg.ProcessFunc = spindle.processPipeline - ccfg.CursorStore = cursorStore - ccfg.AddEventSource(knotEventSource) + err = jc.StartJetstream(ctx, spindle.ingest()) + if err != nil { + return fmt.Errorf("failed to start jetstream consumer: %w", err) + } + + // for each incoming sh.tangled.pipeline, we execute + // spindle.processPipeline, which in turn enqueues the pipeline + // job in the above registered queue. - ec := knotclient.NewEventConsumer(*ccfg) + ccfg := knotclient.NewConsumerConfig() + ccfg.Logger = logger + ccfg.Dev = cfg.Server.Dev + ccfg.ProcessFunc = spindle.processPipeline + ccfg.CursorStore = cursorStore + for _, knot := range spindle.cfg.Knots { + kes := knotclient.NewEventSource(knot) + ccfg.AddEventSource(kes) + } + ec := knotclient.NewEventConsumer(*ccfg) + go func() { + logger.Info("starting knot event consumer", "knots", spindle.cfg.Knots) ec.Start(ctx) }() @@ -159,3 +184,20 @@ func (s *Spindle) processPipeline(ctx context.Context, src knotclient.EventSourc return nil } + +func (s *Spindle) configureOwner() error { + cfgOwner := s.cfg.Server.Owner + serverOwner, err := s.e.GetUserByRole("server:owner", rbacDomain) + if err != nil { + return fmt.Errorf("failed to fetch server:owner: %w", err) + } + + if len(serverOwner) == 0 { + s.e.AddOwner(rbacDomain, cfgOwner) + } else { + if serverOwner[0] != cfgOwner { + return fmt.Errorf("server owner mismatch: %s != %s", cfgOwner, serverOwner[0]) + } + } + return nil +} -- 2.43.0