forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 9 "tangled.sh/tangled.sh/core/api/tangled" 10 "tangled.sh/tangled.sh/core/eventconsumer" 11 "tangled.sh/tangled.sh/core/idresolver" 12 "tangled.sh/tangled.sh/core/rbac" 13 14 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 "github.com/bluesky-social/indigo/atproto/identity" 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 "github.com/bluesky-social/indigo/xrpc" 18 "github.com/bluesky-social/jetstream/pkg/models" 19 securejoin "github.com/cyphar/filepath-securejoin" 20) 21 22type Ingester func(ctx context.Context, e *models.Event) error 23 24func (s *Spindle) ingest() Ingester { 25 return func(ctx context.Context, e *models.Event) error { 26 var err error 27 defer func() { 28 eventTime := e.TimeUS 29 lastTimeUs := eventTime + 1 30 if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil { 31 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 32 } 33 }() 34 35 if e.Kind != models.EventKindCommit { 36 return nil 37 } 38 39 switch e.Commit.Collection { 40 case tangled.SpindleMemberNSID: 41 s.ingestMember(ctx, e) 42 case tangled.RepoNSID: 43 s.ingestRepo(ctx, e) 44 case tangled.RepoCollaboratorNSID: 45 s.ingestCollaborator(ctx, e) 46 } 47 48 return err 49 } 50} 51 52func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error { 53 did := e.Did 54 var err error 55 56 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID) 57 58 switch e.Commit.Operation { 59 case models.CommitOperationCreate, models.CommitOperationUpdate: 60 raw := e.Commit.Record 61 record := tangled.SpindleMember{} 62 err = json.Unmarshal(raw, &record) 63 if err != nil { 64 l.Error("invalid record", "error", err) 65 return err 66 } 67 68 domain := s.cfg.Server.Hostname 69 if s.cfg.Server.Dev { 70 domain = s.cfg.Server.ListenAddr 71 } 72 recordInstance := record.Instance 73 74 if recordInstance != domain { 75 l.Error("domain mismatch", "domain", recordInstance, "expected", domain) 76 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain) 77 } 78 79 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain) 80 if err != nil || !ok { 81 l.Error("failed to add member", "did", did, "error", err) 82 return fmt.Errorf("failed to enforce permissions: %w", err) 83 } 84 85 if err := s.e.AddSpindleMember(rbacDomain, record.Subject); err != nil { 86 l.Error("failed to add member", "error", err) 87 return fmt.Errorf("failed to add member: %w", err) 88 } 89 l.Info("added member from firehose", "member", record.Subject) 90 91 if err := s.db.AddDid(record.Subject); err != nil { 92 l.Error("failed to add did", "error", err) 93 return fmt.Errorf("failed to add did: %w", err) 94 } 95 s.jc.AddDid(record.Subject) 96 97 return nil 98 99 } 100 return nil 101} 102 103func (s *Spindle) ingestRepo(ctx context.Context, e *models.Event) error { 104 var err error 105 did := e.Did 106 resolver := idresolver.DefaultResolver() 107 108 l := s.l.With("component", "ingester", "record", tangled.RepoNSID) 109 110 l.Info("ingesting repo record") 111 112 switch e.Commit.Operation { 113 case models.CommitOperationCreate, models.CommitOperationUpdate: 114 raw := e.Commit.Record 115 record := tangled.Repo{} 116 err = json.Unmarshal(raw, &record) 117 if err != nil { 118 l.Error("invalid record", "error", err) 119 return err 120 } 121 122 domain := s.cfg.Server.Hostname 123 124 // no spindle configured for this repo 125 if record.Spindle == nil { 126 l.Info("no spindle configured", "did", record.Owner, "name", record.Name) 127 return nil 128 } 129 130 // this repo did not want this spindle 131 if *record.Spindle != domain { 132 l.Info("different spindle configured", "did", record.Owner, "name", record.Name, "spindle", *record.Spindle, "domain", domain) 133 return nil 134 } 135 136 // add this repo to the watch list 137 if err := s.db.AddRepo(record.Knot, record.Owner, record.Name); err != nil { 138 l.Error("failed to add repo", "error", err) 139 return fmt.Errorf("failed to add repo: %w", err) 140 } 141 142 didSlashRepo, err := securejoin.SecureJoin(record.Owner, record.Name) 143 if err != nil { 144 return err 145 } 146 147 // add repo to rbac 148 if err := s.e.AddRepo(record.Owner, rbac.ThisServer, didSlashRepo); err != nil { 149 l.Error("failed to add repo to enforcer", "error", err) 150 return fmt.Errorf("failed to add repo: %w", err) 151 } 152 153 // add collaborators to rbac 154 owner, err := resolver.ResolveIdent(ctx, did) 155 if err != nil || owner.Handle.IsInvalidHandle() { 156 return err 157 } 158 if err := s.fetchAndAddCollaborators(ctx, owner, didSlashRepo); err != nil { 159 return err 160 } 161 162 // add this knot to the event consumer 163 src := eventconsumer.NewKnotSource(record.Knot) 164 s.ks.AddSource(context.Background(), src) 165 166 return nil 167 168 } 169 return nil 170} 171 172func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error { 173 var err error 174 175 l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did) 176 177 l.Info("ingesting collaborator record") 178 179 switch e.Commit.Operation { 180 case models.CommitOperationCreate, models.CommitOperationUpdate: 181 raw := e.Commit.Record 182 record := tangled.RepoCollaborator{} 183 err = json.Unmarshal(raw, &record) 184 if err != nil { 185 l.Error("invalid record", "error", err) 186 return err 187 } 188 189 resolver := idresolver.DefaultResolver() 190 191 subjectId, err := resolver.ResolveIdent(ctx, record.Subject) 192 if err != nil || subjectId.Handle.IsInvalidHandle() { 193 return err 194 } 195 196 repoAt, err := syntax.ParseATURI(record.Repo) 197 if err != nil { 198 l.Info("rejecting record, invalid repoAt", "repoAt", record.Repo) 199 return nil 200 } 201 202 // TODO: get rid of this entirely 203 // resolve this aturi to extract the repo record 204 owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 205 if err != nil || owner.Handle.IsInvalidHandle() { 206 return fmt.Errorf("failed to resolve handle: %w", err) 207 } 208 209 xrpcc := xrpc.Client{ 210 Host: owner.PDSEndpoint(), 211 } 212 213 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 214 if err != nil { 215 return err 216 } 217 218 repo := resp.Value.Val.(*tangled.Repo) 219 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 220 221 // check perms for this user 222 if ok, err := s.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil { 223 return fmt.Errorf("insufficient permissions: %w", err) 224 } 225 226 // add collaborator to rbac 227 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil { 228 l.Error("failed to add repo to enforcer", "error", err) 229 return fmt.Errorf("failed to add repo: %w", err) 230 } 231 232 return nil 233 } 234 return nil 235} 236 237func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, owner *identity.Identity, didSlashRepo string) error { 238 l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators") 239 240 l.Info("fetching and adding existing collaborators") 241 242 xrpcc := xrpc.Client{ 243 Host: owner.PDSEndpoint(), 244 } 245 246 resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, owner.DID.String(), false) 247 if err != nil { 248 return err 249 } 250 251 var errs error 252 for _, r := range resp.Records { 253 if r == nil { 254 continue 255 } 256 record := r.Value.Val.(*tangled.RepoCollaborator) 257 258 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil { 259 l.Error("failed to add repo to enforcer", "error", err) 260 errors.Join(errs, fmt.Errorf("failed to add repo: %w", err)) 261 } 262 } 263 264 return errs 265}