forked from tangled.org/core
this repo has no description
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "time" 9 10 "tangled.sh/tangled.sh/core/api/tangled" 11 "tangled.sh/tangled.sh/core/eventconsumer" 12 "tangled.sh/tangled.sh/core/idresolver" 13 "tangled.sh/tangled.sh/core/rbac" 14 "tangled.sh/tangled.sh/core/spindle/db" 15 16 comatproto "github.com/bluesky-social/indigo/api/atproto" 17 "github.com/bluesky-social/indigo/atproto/identity" 18 "github.com/bluesky-social/indigo/atproto/syntax" 19 "github.com/bluesky-social/indigo/xrpc" 20 "github.com/bluesky-social/jetstream/pkg/models" 21 securejoin "github.com/cyphar/filepath-securejoin" 22) 23 24type Ingester func(ctx context.Context, e *models.Event) error 25 26func (s *Spindle) ingest() Ingester { 27 return func(ctx context.Context, e *models.Event) error { 28 var err error 29 defer func() { 30 eventTime := e.TimeUS 31 lastTimeUs := eventTime + 1 32 if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil { 33 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 34 } 35 }() 36 37 if e.Kind != models.EventKindCommit { 38 return nil 39 } 40 41 switch e.Commit.Collection { 42 case tangled.SpindleMemberNSID: 43 s.ingestMember(ctx, e) 44 case tangled.RepoNSID: 45 s.ingestRepo(ctx, e) 46 case tangled.RepoCollaboratorNSID: 47 s.ingestCollaborator(ctx, e) 48 } 49 50 return err 51 } 52} 53 54func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error { 55 var err error 56 did := e.Did 57 rkey := e.Commit.RKey 58 59 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID) 60 61 switch e.Commit.Operation { 62 case models.CommitOperationCreate, models.CommitOperationUpdate: 63 raw := e.Commit.Record 64 record := tangled.SpindleMember{} 65 err = json.Unmarshal(raw, &record) 66 if err != nil { 67 l.Error("invalid record", "error", err) 68 return err 69 } 70 71 domain := s.cfg.Server.Hostname 72 recordInstance := record.Instance 73 74 if recordInstance != domain { 75 l.Error("domain mismatch", "domain", recordInstance, "expected", domain) 76 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain) 77 } 78 79 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain) 80 if err != nil || !ok { 81 l.Error("failed to add member", "did", did, "error", err) 82 return fmt.Errorf("failed to enforce permissions: %w", err) 83 } 84 85 if err := db.AddSpindleMember(s.db, db.SpindleMember{ 86 Did: syntax.DID(did), 87 Rkey: rkey, 88 Instance: recordInstance, 89 Subject: syntax.DID(record.Subject), 90 Created: time.Now(), 91 }); err != nil { 92 l.Error("failed to add member", "error", err) 93 return fmt.Errorf("failed to add member: %w", err) 94 } 95 96 if err := s.e.AddSpindleMember(rbacDomain, record.Subject); err != nil { 97 l.Error("failed to add member", "error", err) 98 return fmt.Errorf("failed to add member: %w", err) 99 } 100 l.Info("added member from firehose", "member", record.Subject) 101 102 if err := s.db.AddDid(record.Subject); err != nil { 103 l.Error("failed to add did", "error", err) 104 return fmt.Errorf("failed to add did: %w", err) 105 } 106 s.jc.AddDid(record.Subject) 107 108 return nil 109 110 case models.CommitOperationDelete: 111 record, err := db.GetSpindleMember(s.db, did, rkey) 112 if err != nil { 113 l.Error("failed to find member", "error", err) 114 return fmt.Errorf("failed to find member: %w", err) 115 } 116 117 if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { 118 l.Error("failed to remove member", "error", err) 119 return fmt.Errorf("failed to remove member: %w", err) 120 } 121 122 if err := s.e.RemoveSpindleMember(rbacDomain, record.Subject.String()); err != nil { 123 l.Error("failed to add member", "error", err) 124 return fmt.Errorf("failed to add member: %w", err) 125 } 126 l.Info("added member from firehose", "member", record.Subject) 127 128 if err := s.db.RemoveDid(record.Subject.String()); err != nil { 129 l.Error("failed to add did", "error", err) 130 return fmt.Errorf("failed to add did: %w", err) 131 } 132 s.jc.RemoveDid(record.Subject.String()) 133 134 } 135 return nil 136} 137 138func (s *Spindle) ingestRepo(ctx context.Context, e *models.Event) error { 139 var err error 140 did := e.Did 141 resolver := idresolver.DefaultResolver() 142 143 l := s.l.With("component", "ingester", "record", tangled.RepoNSID) 144 145 l.Info("ingesting repo record") 146 147 switch e.Commit.Operation { 148 case models.CommitOperationCreate, models.CommitOperationUpdate: 149 raw := e.Commit.Record 150 record := tangled.Repo{} 151 err = json.Unmarshal(raw, &record) 152 if err != nil { 153 l.Error("invalid record", "error", err) 154 return err 155 } 156 157 domain := s.cfg.Server.Hostname 158 159 // no spindle configured for this repo 160 if record.Spindle == nil { 161 l.Info("no spindle configured", "did", record.Owner, "name", record.Name) 162 return nil 163 } 164 165 // this repo did not want this spindle 166 if *record.Spindle != domain { 167 l.Info("different spindle configured", "did", record.Owner, "name", record.Name, "spindle", *record.Spindle, "domain", domain) 168 return nil 169 } 170 171 // add this repo to the watch list 172 if err := s.db.AddRepo(record.Knot, record.Owner, record.Name); err != nil { 173 l.Error("failed to add repo", "error", err) 174 return fmt.Errorf("failed to add repo: %w", err) 175 } 176 177 didSlashRepo, err := securejoin.SecureJoin(record.Owner, record.Name) 178 if err != nil { 179 return err 180 } 181 182 // add repo to rbac 183 if err := s.e.AddRepo(record.Owner, rbac.ThisServer, didSlashRepo); err != nil { 184 l.Error("failed to add repo to enforcer", "error", err) 185 return fmt.Errorf("failed to add repo: %w", err) 186 } 187 188 // add collaborators to rbac 189 owner, err := resolver.ResolveIdent(ctx, did) 190 if err != nil || owner.Handle.IsInvalidHandle() { 191 return err 192 } 193 if err := s.fetchAndAddCollaborators(ctx, owner, didSlashRepo); err != nil { 194 return err 195 } 196 197 // add this knot to the event consumer 198 src := eventconsumer.NewKnotSource(record.Knot) 199 s.ks.AddSource(context.Background(), src) 200 201 return nil 202 203 } 204 return nil 205} 206 207func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error { 208 var err error 209 210 l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did) 211 212 l.Info("ingesting collaborator record") 213 214 switch e.Commit.Operation { 215 case models.CommitOperationCreate, models.CommitOperationUpdate: 216 raw := e.Commit.Record 217 record := tangled.RepoCollaborator{} 218 err = json.Unmarshal(raw, &record) 219 if err != nil { 220 l.Error("invalid record", "error", err) 221 return err 222 } 223 224 resolver := idresolver.DefaultResolver() 225 226 subjectId, err := resolver.ResolveIdent(ctx, record.Subject) 227 if err != nil || subjectId.Handle.IsInvalidHandle() { 228 return err 229 } 230 231 repoAt, err := syntax.ParseATURI(record.Repo) 232 if err != nil { 233 l.Info("rejecting record, invalid repoAt", "repoAt", record.Repo) 234 return nil 235 } 236 237 // TODO: get rid of this entirely 238 // resolve this aturi to extract the repo record 239 owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 240 if err != nil || owner.Handle.IsInvalidHandle() { 241 return fmt.Errorf("failed to resolve handle: %w", err) 242 } 243 244 xrpcc := xrpc.Client{ 245 Host: owner.PDSEndpoint(), 246 } 247 248 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 249 if err != nil { 250 return err 251 } 252 253 repo := resp.Value.Val.(*tangled.Repo) 254 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 255 256 // check perms for this user 257 if ok, err := s.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil { 258 return fmt.Errorf("insufficient permissions: %w", err) 259 } 260 261 // add collaborator to rbac 262 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil { 263 l.Error("failed to add repo to enforcer", "error", err) 264 return fmt.Errorf("failed to add repo: %w", err) 265 } 266 267 return nil 268 } 269 return nil 270} 271 272func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, owner *identity.Identity, didSlashRepo string) error { 273 l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators") 274 275 l.Info("fetching and adding existing collaborators") 276 277 xrpcc := xrpc.Client{ 278 Host: owner.PDSEndpoint(), 279 } 280 281 resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, owner.DID.String(), false) 282 if err != nil { 283 return err 284 } 285 286 var errs error 287 for _, r := range resp.Records { 288 if r == nil { 289 continue 290 } 291 record := r.Value.Val.(*tangled.RepoCollaborator) 292 293 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil { 294 l.Error("failed to add repo to enforcer", "error", err) 295 errors.Join(errs, fmt.Errorf("failed to add repo: %w", err)) 296 } 297 } 298 299 return errs 300}