forked from tangled.org/core
this repo has no description
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 8 "tangled.sh/tangled.sh/core/api/tangled" 9 "tangled.sh/tangled.sh/core/eventconsumer" 10 11 "github.com/bluesky-social/jetstream/pkg/models" 12) 13 14type Ingester func(ctx context.Context, e *models.Event) error 15 16func (s *Spindle) ingest() Ingester { 17 return func(ctx context.Context, e *models.Event) error { 18 var err error 19 defer func() { 20 eventTime := e.TimeUS 21 lastTimeUs := eventTime + 1 22 if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil { 23 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 24 } 25 }() 26 27 if e.Kind != models.EventKindCommit { 28 return nil 29 } 30 31 switch e.Commit.Collection { 32 case tangled.SpindleMemberNSID: 33 s.ingestMember(ctx, e) 34 case tangled.RepoNSID: 35 s.ingestRepo(ctx, e) 36 } 37 38 return err 39 } 40} 41 42func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error { 43 did := e.Did 44 var err error 45 46 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID) 47 48 switch e.Commit.Operation { 49 case models.CommitOperationCreate, models.CommitOperationUpdate: 50 raw := e.Commit.Record 51 record := tangled.SpindleMember{} 52 err = json.Unmarshal(raw, &record) 53 if err != nil { 54 l.Error("invalid record", "error", err) 55 return err 56 } 57 58 domain := s.cfg.Server.Hostname 59 if s.cfg.Server.Dev { 60 domain = s.cfg.Server.ListenAddr 61 } 62 recordInstance := record.Instance 63 64 if recordInstance != domain { 65 l.Error("domain mismatch", "domain", recordInstance, "expected", domain) 66 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain) 67 } 68 69 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain) 70 if err != nil || !ok { 71 l.Error("failed to add member", "did", did, "error", err) 72 return fmt.Errorf("failed to enforce permissions: %w", err) 73 } 74 75 if err := s.e.AddKnotMember(rbacDomain, record.Subject); err != nil { 76 l.Error("failed to add member", "error", err) 77 return fmt.Errorf("failed to add member: %w", err) 78 } 79 l.Info("added member from firehose", "member", record.Subject) 80 81 if err := s.db.AddDid(record.Subject); err != nil { 82 l.Error("failed to add did", "error", err) 83 return fmt.Errorf("failed to add did: %w", err) 84 } 85 s.jc.AddDid(record.Subject) 86 87 return nil 88 89 } 90 return nil 91} 92 93func (s *Spindle) ingestRepo(_ context.Context, e *models.Event) error { 94 var err error 95 96 l := s.l.With("component", "ingester", "record", tangled.RepoNSID) 97 98 l.Info("ingesting repo record") 99 100 switch e.Commit.Operation { 101 case models.CommitOperationCreate, models.CommitOperationUpdate: 102 raw := e.Commit.Record 103 record := tangled.Repo{} 104 err = json.Unmarshal(raw, &record) 105 if err != nil { 106 l.Error("invalid record", "error", err) 107 return err 108 } 109 110 domain := s.cfg.Server.Hostname 111 112 // no spindle configured for this repo 113 if record.Spindle == nil { 114 l.Info("no spindle configured", "did", record.Owner, "name", record.Name) 115 return nil 116 } 117 118 // this repo did not want this spindle 119 if *record.Spindle != domain { 120 l.Info("different spindle configured", "did", record.Owner, "name", record.Name, "spindle", *record.Spindle, "domain", domain) 121 return nil 122 } 123 124 // add this repo to the watch list 125 if err := s.db.AddRepo(record.Knot, record.Owner, record.Name); err != nil { 126 l.Error("failed to add repo", "error", err) 127 return fmt.Errorf("failed to add repo: %w", err) 128 } 129 130 // add this knot to the event consumer 131 src := eventconsumer.NewKnotSource(record.Knot) 132 s.ks.AddSource(context.Background(), src) 133 134 return nil 135 136 } 137 return nil 138}