spindle: setup jetstream ingester #245

merged
opened by anirudh.fi targeting master from push-tlxunysxvxwk
Changed files
+156 -25
spindle
+3 -1
spindle/config/config.go
···
Hostname string `env:"HOSTNAME, required"`
JetstreamEndpoint string `env:"JETSTREAM_ENDPOINT, default=wss://jetstream1.us-west.bsky.network/subscribe"`
Dev bool `env:"DEV, default=false"`
+
Owner string `env:"OWNER, required"`
}
type Config struct {
-
Server Server `env:",prefix=SPINDLE_SERVER_"`
+
Server Server `env:",prefix=SPINDLE_SERVER_"`
+
Knots []string `env:"SPINDLE_SUBSCRIBED_KNOTS,required"`
}
func Load(ctx context.Context) (*Config, error) {
+87
spindle/ingester.go
···
+
package spindle
+
+
import (
+
"context"
+
"encoding/json"
+
"fmt"
+
+
"github.com/bluesky-social/jetstream/pkg/models"
+
"tangled.sh/tangled.sh/core/api/tangled"
+
)
+
+
type Ingester func(ctx context.Context, e *models.Event) error
+
+
func (s *Spindle) ingest() Ingester {
+
return func(ctx context.Context, e *models.Event) error {
+
var err error
+
defer func() {
+
eventTime := e.TimeUS
+
lastTimeUs := eventTime + 1
+
if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil {
+
err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
+
}
+
}()
+
+
if e.Kind != models.EventKindCommit {
+
return nil
+
}
+
+
switch e.Commit.Collection {
+
case tangled.SpindleMemberNSID:
+
s.ingestMember(ctx, e)
+
}
+
+
return err
+
}
+
}
+
+
func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error {
+
did := e.Did
+
var err error
+
+
l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID)
+
+
switch e.Commit.Operation {
+
case models.CommitOperationCreate, models.CommitOperationUpdate:
+
raw := e.Commit.Record
+
record := tangled.SpindleMember{}
+
err = json.Unmarshal(raw, &record)
+
if err != nil {
+
l.Error("invalid record", "error", err)
+
return err
+
}
+
+
domain := s.cfg.Server.Hostname
+
if s.cfg.Server.Dev {
+
domain = s.cfg.Server.ListenAddr
+
}
+
recordInstance := *record.Instance
+
+
if recordInstance != domain {
+
l.Error("domain mismatch", "domain", recordInstance, "expected", domain)
+
return fmt.Errorf("domain mismatch: %s != %s", *record.Instance, domain)
+
}
+
+
ok, err := s.e.E.Enforce(did, rbacDomain, rbacDomain, "server:invite")
+
if err != nil || !ok {
+
l.Error("failed to add member", "did", did)
+
return fmt.Errorf("failed to enforce permissions: %w", err)
+
}
+
+
if err := s.e.AddMember(rbacDomain, record.Subject); err != nil {
+
l.Error("failed to add member", "error", err)
+
return fmt.Errorf("failed to add member: %w", err)
+
}
+
l.Info("added member from firehose", "member", record.Subject)
+
+
if err := s.db.AddDid(did); err != nil {
+
l.Error("failed to add did", "error", err)
+
return fmt.Errorf("failed to add did: %w", err)
+
}
+
s.jc.AddDid(did)
+
+
return nil
+
+
}
+
return nil
+
}
+66 -24
spindle/server.go
···
"tangled.sh/tangled.sh/core/spindle/queue"
)
+
const (
+
rbacDomain = "thisserver"
+
)
+
type Spindle struct {
jc *jetstream.JetstreamClient
db *db.DB
···
n *notifier.Notifier
eng *engine.Engine
jq *queue.Queue
+
cfg *config.Config
}
func Run(ctx context.Context) error {
+
logger := log.FromContext(ctx)
+
cfg, err := config.Load(ctx)
if err != nil {
return fmt.Errorf("failed to load config: %w", err)
···
if err != nil {
return fmt.Errorf("failed to setup rbac enforcer: %w", err)
}
-
-
logger := log.FromContext(ctx)
-
-
collections := []string{tangled.SpindleMemberNSID}
-
jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, false)
-
if err != nil {
-
return fmt.Errorf("failed to setup jetstream client: %w", err)
-
}
+
e.E.EnableAutoSave(true)
n := notifier.New()
+
eng, err := engine.New(ctx, d, &n)
if err != nil {
return err
···
jq := queue.NewQueue(100, 2)
-
// starts a job queue runner in the background
-
jq.Start()
-
defer jq.Stop()
+
collections := []string{tangled.SpindleMemberNSID}
+
jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, false, false)
+
if err != nil {
+
return fmt.Errorf("failed to setup jetstream client: %w", err)
+
}
spindle := Spindle{
jc: jc,
···
n: &n,
eng: eng,
jq: jq,
+
cfg: cfg,
}
-
// for each incoming sh.tangled.pipeline, we execute
-
// spindle.processPipeline, which in turn enqueues the pipeline
-
// job in the above registered queue.
+
err = e.AddDomain(rbacDomain)
+
if err != nil {
+
return fmt.Errorf("failed to set rbac domain: %w", err)
+
}
+
err = spindle.configureOwner()
+
if err != nil {
+
return err
+
}
+
logger.Info("owner set", "did", cfg.Server.Owner)
+
+
// starts a job queue runner in the background
+
jq.Start()
+
defer jq.Stop()
+
cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
if err != nil {
return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
}
-
go func() {
-
logger.Info("starting event consumer")
-
knotEventSource := knotclient.NewEventSource("localhost:6000")
-
ccfg := knotclient.NewConsumerConfig()
-
ccfg.Logger = logger
-
ccfg.Dev = cfg.Server.Dev
-
ccfg.ProcessFunc = spindle.processPipeline
-
ccfg.CursorStore = cursorStore
-
ccfg.AddEventSource(knotEventSource)
+
err = jc.StartJetstream(ctx, spindle.ingest())
+
if err != nil {
+
return fmt.Errorf("failed to start jetstream consumer: %w", err)
+
}
+
+
// for each incoming sh.tangled.pipeline, we execute
+
// spindle.processPipeline, which in turn enqueues the pipeline
+
// job in the above registered queue.
-
ec := knotclient.NewEventConsumer(*ccfg)
+
ccfg := knotclient.NewConsumerConfig()
+
ccfg.Logger = logger
+
ccfg.Dev = cfg.Server.Dev
+
ccfg.ProcessFunc = spindle.processPipeline
+
ccfg.CursorStore = cursorStore
+
for _, knot := range spindle.cfg.Knots {
+
kes := knotclient.NewEventSource(knot)
+
ccfg.AddEventSource(kes)
+
}
+
ec := knotclient.NewEventConsumer(*ccfg)
+
go func() {
+
logger.Info("starting knot event consumer", "knots", spindle.cfg.Knots)
ec.Start(ctx)
}()
···
return nil
}
+
+
func (s *Spindle) configureOwner() error {
+
cfgOwner := s.cfg.Server.Owner
+
serverOwner, err := s.e.GetUserByRole("server:owner", rbacDomain)
+
if err != nil {
+
return fmt.Errorf("failed to fetch server:owner: %w", err)
+
}
+
+
if len(serverOwner) == 0 {
+
s.e.AddOwner(rbacDomain, cfgOwner)
+
} else {
+
if serverOwner[0] != cfgOwner {
+
return fmt.Errorf("server owner mismatch: %s != %s", cfgOwner, serverOwner[0])
+
}
+
}
+
return nil
+
}