forked from tangled.org/core
this repo has no description
at master 8.7 kB view raw
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "time" 9 10 "tangled.sh/tangled.sh/core/api/tangled" 11 "tangled.sh/tangled.sh/core/eventconsumer" 12 "tangled.sh/tangled.sh/core/idresolver" 13 "tangled.sh/tangled.sh/core/rbac" 14 "tangled.sh/tangled.sh/core/spindle/db" 15 16 comatproto "github.com/bluesky-social/indigo/api/atproto" 17 "github.com/bluesky-social/indigo/atproto/identity" 18 "github.com/bluesky-social/indigo/atproto/syntax" 19 "github.com/bluesky-social/indigo/xrpc" 20 "github.com/bluesky-social/jetstream/pkg/models" 21 securejoin "github.com/cyphar/filepath-securejoin" 22) 23 24type Ingester func(ctx context.Context, e *models.Event) error 25 26func (s *Spindle) ingest() Ingester { 27 return func(ctx context.Context, e *models.Event) error { 28 var err error 29 defer func() { 30 eventTime := e.TimeUS 31 lastTimeUs := eventTime + 1 32 if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil { 33 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 34 } 35 }() 36 37 if e.Kind != models.EventKindCommit { 38 return nil 39 } 40 41 switch e.Commit.Collection { 42 case tangled.SpindleMemberNSID: 43 err = s.ingestMember(ctx, e) 44 case tangled.RepoNSID: 45 err = s.ingestRepo(ctx, e) 46 case tangled.RepoCollaboratorNSID: 47 err = s.ingestCollaborator(ctx, e) 48 } 49 50 if err != nil { 51 s.l.Debug("failed to process message", "nsid", e.Commit.Collection, "err", err) 52 } 53 54 return nil 55 } 56} 57 58func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error { 59 var err error 60 did := e.Did 61 rkey := e.Commit.RKey 62 63 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID) 64 65 switch e.Commit.Operation { 66 case models.CommitOperationCreate, models.CommitOperationUpdate: 67 raw := e.Commit.Record 68 record := tangled.SpindleMember{} 69 err = json.Unmarshal(raw, &record) 70 if err != nil { 71 l.Error("invalid record", "error", err) 72 return err 73 } 74 75 domain := s.cfg.Server.Hostname 76 recordInstance := record.Instance 77 78 if recordInstance != domain { 79 l.Error("domain mismatch", "domain", recordInstance, "expected", domain) 80 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain) 81 } 82 83 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain) 84 if err != nil || !ok { 85 l.Error("failed to add member", "did", did, "error", err) 86 return fmt.Errorf("failed to enforce permissions: %w", err) 87 } 88 89 if err := db.AddSpindleMember(s.db, db.SpindleMember{ 90 Did: syntax.DID(did), 91 Rkey: rkey, 92 Instance: recordInstance, 93 Subject: syntax.DID(record.Subject), 94 Created: time.Now(), 95 }); err != nil { 96 l.Error("failed to add member", "error", err) 97 return fmt.Errorf("failed to add member: %w", err) 98 } 99 100 if err := s.e.AddSpindleMember(rbacDomain, record.Subject); err != nil { 101 l.Error("failed to add member", "error", err) 102 return fmt.Errorf("failed to add member: %w", err) 103 } 104 l.Info("added member from firehose", "member", record.Subject) 105 106 if err := s.db.AddDid(record.Subject); err != nil { 107 l.Error("failed to add did", "error", err) 108 return fmt.Errorf("failed to add did: %w", err) 109 } 110 s.jc.AddDid(record.Subject) 111 112 return nil 113 114 case models.CommitOperationDelete: 115 record, err := db.GetSpindleMember(s.db, did, rkey) 116 if err != nil { 117 l.Error("failed to find member", "error", err) 118 return fmt.Errorf("failed to find member: %w", err) 119 } 120 121 if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { 122 l.Error("failed to remove member", "error", err) 123 return fmt.Errorf("failed to remove member: %w", err) 124 } 125 126 if err := s.e.RemoveSpindleMember(rbacDomain, record.Subject.String()); err != nil { 127 l.Error("failed to add member", "error", err) 128 return fmt.Errorf("failed to add member: %w", err) 129 } 130 l.Info("added member from firehose", "member", record.Subject) 131 132 if err := s.db.RemoveDid(record.Subject.String()); err != nil { 133 l.Error("failed to add did", "error", err) 134 return fmt.Errorf("failed to add did: %w", err) 135 } 136 s.jc.RemoveDid(record.Subject.String()) 137 138 } 139 return nil 140} 141 142func (s *Spindle) ingestRepo(ctx context.Context, e *models.Event) error { 143 var err error 144 did := e.Did 145 resolver := idresolver.DefaultResolver() 146 147 l := s.l.With("component", "ingester", "record", tangled.RepoNSID) 148 149 l.Info("ingesting repo record") 150 151 switch e.Commit.Operation { 152 case models.CommitOperationCreate, models.CommitOperationUpdate: 153 raw := e.Commit.Record 154 record := tangled.Repo{} 155 err = json.Unmarshal(raw, &record) 156 if err != nil { 157 l.Error("invalid record", "error", err) 158 return err 159 } 160 161 domain := s.cfg.Server.Hostname 162 163 // no spindle configured for this repo 164 if record.Spindle == nil { 165 l.Info("no spindle configured", "did", record.Owner, "name", record.Name) 166 return nil 167 } 168 169 // this repo did not want this spindle 170 if *record.Spindle != domain { 171 l.Info("different spindle configured", "did", record.Owner, "name", record.Name, "spindle", *record.Spindle, "domain", domain) 172 return nil 173 } 174 175 // add this repo to the watch list 176 if err := s.db.AddRepo(record.Knot, record.Owner, record.Name); err != nil { 177 l.Error("failed to add repo", "error", err) 178 return fmt.Errorf("failed to add repo: %w", err) 179 } 180 181 didSlashRepo, err := securejoin.SecureJoin(record.Owner, record.Name) 182 if err != nil { 183 return err 184 } 185 186 // add repo to rbac 187 if err := s.e.AddRepo(record.Owner, rbac.ThisServer, didSlashRepo); err != nil { 188 l.Error("failed to add repo to enforcer", "error", err) 189 return fmt.Errorf("failed to add repo: %w", err) 190 } 191 192 // add collaborators to rbac 193 owner, err := resolver.ResolveIdent(ctx, did) 194 if err != nil || owner.Handle.IsInvalidHandle() { 195 return err 196 } 197 if err := s.fetchAndAddCollaborators(ctx, owner, didSlashRepo); err != nil { 198 return err 199 } 200 201 // add this knot to the event consumer 202 src := eventconsumer.NewKnotSource(record.Knot) 203 s.ks.AddSource(context.Background(), src) 204 205 return nil 206 207 } 208 return nil 209} 210 211func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error { 212 var err error 213 214 l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did) 215 216 l.Info("ingesting collaborator record") 217 218 switch e.Commit.Operation { 219 case models.CommitOperationCreate, models.CommitOperationUpdate: 220 raw := e.Commit.Record 221 record := tangled.RepoCollaborator{} 222 err = json.Unmarshal(raw, &record) 223 if err != nil { 224 l.Error("invalid record", "error", err) 225 return err 226 } 227 228 resolver := idresolver.DefaultResolver() 229 230 subjectId, err := resolver.ResolveIdent(ctx, record.Subject) 231 if err != nil || subjectId.Handle.IsInvalidHandle() { 232 return err 233 } 234 235 repoAt, err := syntax.ParseATURI(record.Repo) 236 if err != nil { 237 l.Info("rejecting record, invalid repoAt", "repoAt", record.Repo) 238 return nil 239 } 240 241 // TODO: get rid of this entirely 242 // resolve this aturi to extract the repo record 243 owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 244 if err != nil || owner.Handle.IsInvalidHandle() { 245 return fmt.Errorf("failed to resolve handle: %w", err) 246 } 247 248 xrpcc := xrpc.Client{ 249 Host: owner.PDSEndpoint(), 250 } 251 252 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 253 if err != nil { 254 return err 255 } 256 257 repo := resp.Value.Val.(*tangled.Repo) 258 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 259 260 // check perms for this user 261 if ok, err := s.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil { 262 return fmt.Errorf("insufficient permissions: %w", err) 263 } 264 265 // add collaborator to rbac 266 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil { 267 l.Error("failed to add repo to enforcer", "error", err) 268 return fmt.Errorf("failed to add repo: %w", err) 269 } 270 271 return nil 272 } 273 return nil 274} 275 276func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, owner *identity.Identity, didSlashRepo string) error { 277 l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators") 278 279 l.Info("fetching and adding existing collaborators") 280 281 xrpcc := xrpc.Client{ 282 Host: owner.PDSEndpoint(), 283 } 284 285 resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, owner.DID.String(), false) 286 if err != nil { 287 return err 288 } 289 290 var errs error 291 for _, r := range resp.Records { 292 if r == nil { 293 continue 294 } 295 record := r.Value.Val.(*tangled.RepoCollaborator) 296 297 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil { 298 l.Error("failed to add repo to enforcer", "error", err) 299 errors.Join(errs, fmt.Errorf("failed to add repo: %w", err)) 300 } 301 } 302 303 return errs 304}