forked from tangled.org/core
this repo has no description
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 8 "tangled.sh/tangled.sh/core/api/tangled" 9 "tangled.sh/tangled.sh/core/knotclient" 10 11 "github.com/bluesky-social/jetstream/pkg/models" 12) 13 14type Ingester func(ctx context.Context, e *models.Event) error 15 16func (s *Spindle) ingest() Ingester { 17 return func(ctx context.Context, e *models.Event) error { 18 var err error 19 defer func() { 20 eventTime := e.TimeUS 21 lastTimeUs := eventTime + 1 22 if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil { 23 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 24 } 25 }() 26 27 if e.Kind != models.EventKindCommit { 28 return nil 29 } 30 31 switch e.Commit.Collection { 32 case tangled.SpindleMemberNSID: 33 s.ingestMember(ctx, e) 34 case tangled.RepoNSID: 35 s.ingestRepo(ctx, e) 36 } 37 38 return err 39 } 40} 41 42func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error { 43 did := e.Did 44 var err error 45 46 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID) 47 48 switch e.Commit.Operation { 49 case models.CommitOperationCreate, models.CommitOperationUpdate: 50 raw := e.Commit.Record 51 record := tangled.SpindleMember{} 52 err = json.Unmarshal(raw, &record) 53 if err != nil { 54 l.Error("invalid record", "error", err) 55 return err 56 } 57 58 domain := s.cfg.Server.Hostname 59 if s.cfg.Server.Dev { 60 domain = s.cfg.Server.ListenAddr 61 } 62 recordInstance := *record.Instance 63 64 if recordInstance != domain { 65 l.Error("domain mismatch", "domain", recordInstance, "expected", domain) 66 return fmt.Errorf("domain mismatch: %s != %s", *record.Instance, domain) 67 } 68 69 ok, err := s.e.E.Enforce(did, rbacDomain, rbacDomain, "server:invite") 70 if err != nil || !ok { 71 l.Error("failed to add member", "did", did) 72 return fmt.Errorf("failed to enforce permissions: %w", err) 73 } 74 75 if err := s.e.AddKnotMember(rbacDomain, record.Subject); err != nil { 76 l.Error("failed to add member", "error", err) 77 return fmt.Errorf("failed to add member: %w", err) 78 } 79 l.Info("added member from firehose", "member", record.Subject) 80 81 if err := s.db.AddDid(did); err != nil { 82 l.Error("failed to add did", "error", err) 83 return fmt.Errorf("failed to add did: %w", err) 84 } 85 s.jc.AddDid(did) 86 87 return nil 88 89 } 90 return nil 91} 92 93func (s *Spindle) ingestRepo(_ context.Context, e *models.Event) error { 94 var err error 95 96 l := s.l.With("component", "ingester", "record", tangled.RepoNSID) 97 98 switch e.Commit.Operation { 99 case models.CommitOperationCreate, models.CommitOperationUpdate: 100 raw := e.Commit.Record 101 record := tangled.Repo{} 102 err = json.Unmarshal(raw, &record) 103 if err != nil { 104 l.Error("invalid record", "error", err) 105 return err 106 } 107 108 domain := s.cfg.Server.Hostname 109 if s.cfg.Server.Dev { 110 domain = s.cfg.Server.ListenAddr 111 } 112 113 // no spindle configured for this repo 114 if record.Spindle == nil { 115 return nil 116 } 117 118 // this repo did not want this spindle 119 if *record.Spindle != domain { 120 return nil 121 } 122 123 // add this repo to the watch list 124 if err := s.db.AddRepo(record.Knot, record.Owner, record.Name); err != nil { 125 l.Error("failed to add repo", "error", err) 126 return fmt.Errorf("failed to add repo: %w", err) 127 } 128 129 // add this knot to the event consumer 130 src := knotclient.NewEventSource(record.Knot) 131 s.ks.AddSource(context.Background(), src) 132 133 return nil 134 135 } 136 return nil 137}