forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 8 "github.com/bluesky-social/jetstream/pkg/models" 9 "tangled.sh/tangled.sh/core/api/tangled" 10) 11 12type Ingester func(ctx context.Context, e *models.Event) error 13 14func (s *Spindle) ingest() Ingester { 15 return func(ctx context.Context, e *models.Event) error { 16 var err error 17 defer func() { 18 eventTime := e.TimeUS 19 lastTimeUs := eventTime + 1 20 if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil { 21 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 22 } 23 }() 24 25 if e.Kind != models.EventKindCommit { 26 return nil 27 } 28 29 switch e.Commit.Collection { 30 case tangled.SpindleMemberNSID: 31 s.ingestMember(ctx, e) 32 } 33 34 return err 35 } 36} 37 38func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error { 39 did := e.Did 40 var err error 41 42 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID) 43 44 switch e.Commit.Operation { 45 case models.CommitOperationCreate, models.CommitOperationUpdate: 46 raw := e.Commit.Record 47 record := tangled.SpindleMember{} 48 err = json.Unmarshal(raw, &record) 49 if err != nil { 50 l.Error("invalid record", "error", err) 51 return err 52 } 53 54 domain := s.cfg.Server.Hostname 55 if s.cfg.Server.Dev { 56 domain = s.cfg.Server.ListenAddr 57 } 58 recordInstance := *record.Instance 59 60 if recordInstance != domain { 61 l.Error("domain mismatch", "domain", recordInstance, "expected", domain) 62 return fmt.Errorf("domain mismatch: %s != %s", *record.Instance, domain) 63 } 64 65 ok, err := s.e.E.Enforce(did, rbacDomain, rbacDomain, "server:invite") 66 if err != nil || !ok { 67 l.Error("failed to add member", "did", did) 68 return fmt.Errorf("failed to enforce permissions: %w", err) 69 } 70 71 if err := s.e.AddMember(rbacDomain, record.Subject); err != nil { 72 l.Error("failed to add member", "error", err) 73 return fmt.Errorf("failed to add member: %w", err) 74 } 75 l.Info("added member from firehose", "member", record.Subject) 76 77 if err := s.db.AddDid(did); err != nil { 78 l.Error("failed to add did", "error", err) 79 return fmt.Errorf("failed to add did: %w", err) 80 } 81 s.jc.AddDid(did) 82 83 return nil 84 85 } 86 return nil 87}