forked from tangled.org/core
this repo has no description
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "path/filepath" 8 9 "tangled.sh/tangled.sh/core/api/tangled" 10 "tangled.sh/tangled.sh/core/eventconsumer" 11 "tangled.sh/tangled.sh/core/rbac" 12 13 "github.com/bluesky-social/jetstream/pkg/models" 14) 15 16type Ingester func(ctx context.Context, e *models.Event) error 17 18func (s *Spindle) ingest() Ingester { 19 return func(ctx context.Context, e *models.Event) error { 20 var err error 21 defer func() { 22 eventTime := e.TimeUS 23 lastTimeUs := eventTime + 1 24 if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil { 25 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 26 } 27 }() 28 29 if e.Kind != models.EventKindCommit { 30 return nil 31 } 32 33 switch e.Commit.Collection { 34 case tangled.SpindleMemberNSID: 35 s.ingestMember(ctx, e) 36 case tangled.RepoNSID: 37 s.ingestRepo(ctx, e) 38 } 39 40 return err 41 } 42} 43 44func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error { 45 did := e.Did 46 var err error 47 48 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID) 49 50 switch e.Commit.Operation { 51 case models.CommitOperationCreate, models.CommitOperationUpdate: 52 raw := e.Commit.Record 53 record := tangled.SpindleMember{} 54 err = json.Unmarshal(raw, &record) 55 if err != nil { 56 l.Error("invalid record", "error", err) 57 return err 58 } 59 60 domain := s.cfg.Server.Hostname 61 if s.cfg.Server.Dev { 62 domain = s.cfg.Server.ListenAddr 63 } 64 recordInstance := record.Instance 65 66 if recordInstance != domain { 67 l.Error("domain mismatch", "domain", recordInstance, "expected", domain) 68 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain) 69 } 70 71 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain) 72 if err != nil || !ok { 73 l.Error("failed to add member", "did", did, "error", err) 74 return fmt.Errorf("failed to enforce permissions: %w", err) 75 } 76 77 if err := s.e.AddSpindleMember(rbacDomain, record.Subject); err != nil { 78 l.Error("failed to add member", "error", err) 79 return fmt.Errorf("failed to add member: %w", err) 80 } 81 l.Info("added member from firehose", "member", record.Subject) 82 83 if err := s.db.AddDid(record.Subject); err != nil { 84 l.Error("failed to add did", "error", err) 85 return fmt.Errorf("failed to add did: %w", err) 86 } 87 s.jc.AddDid(record.Subject) 88 89 return nil 90 91 } 92 return nil 93} 94 95func (s *Spindle) ingestRepo(_ context.Context, e *models.Event) error { 96 var err error 97 98 l := s.l.With("component", "ingester", "record", tangled.RepoNSID) 99 100 l.Info("ingesting repo record") 101 102 switch e.Commit.Operation { 103 case models.CommitOperationCreate, models.CommitOperationUpdate: 104 raw := e.Commit.Record 105 record := tangled.Repo{} 106 err = json.Unmarshal(raw, &record) 107 if err != nil { 108 l.Error("invalid record", "error", err) 109 return err 110 } 111 112 domain := s.cfg.Server.Hostname 113 114 // no spindle configured for this repo 115 if record.Spindle == nil { 116 l.Info("no spindle configured", "did", record.Owner, "name", record.Name) 117 return nil 118 } 119 120 // this repo did not want this spindle 121 if *record.Spindle != domain { 122 l.Info("different spindle configured", "did", record.Owner, "name", record.Name, "spindle", *record.Spindle, "domain", domain) 123 return nil 124 } 125 126 // add this repo to the watch list 127 if err := s.db.AddRepo(record.Knot, record.Owner, record.Name); err != nil { 128 l.Error("failed to add repo", "error", err) 129 return fmt.Errorf("failed to add repo: %w", err) 130 } 131 132 // add repo to rbac 133 if err := s.e.AddRepo(record.Owner, rbac.ThisServer, filepath.Join(record.Owner, record.Name)); err != nil { 134 l.Error("failed to add repo to enforcer", "error", err) 135 return fmt.Errorf("failed to add repo: %w", err) 136 } 137 138 // add this knot to the event consumer 139 src := eventconsumer.NewKnotSource(record.Knot) 140 s.ks.AddSource(context.Background(), src) 141 142 return nil 143 144 } 145 return nil 146}