forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "time" 9 10 "tangled.org/core/api/tangled" 11 "tangled.org/core/eventconsumer" 12 "tangled.org/core/rbac" 13 "tangled.org/core/spindle/db" 14 15 comatproto "github.com/bluesky-social/indigo/api/atproto" 16 "github.com/bluesky-social/indigo/atproto/identity" 17 "github.com/bluesky-social/indigo/atproto/syntax" 18 "github.com/bluesky-social/indigo/xrpc" 19 "github.com/bluesky-social/jetstream/pkg/models" 20 securejoin "github.com/cyphar/filepath-securejoin" 21) 22 23type Ingester func(ctx context.Context, e *models.Event) error 24 25func (s *Spindle) ingest() Ingester { 26 return func(ctx context.Context, e *models.Event) error { 27 var err error 28 defer func() { 29 eventTime := e.TimeUS 30 lastTimeUs := eventTime + 1 31 if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil { 32 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 33 } 34 }() 35 36 if e.Kind != models.EventKindCommit { 37 return nil 38 } 39 40 switch e.Commit.Collection { 41 case tangled.SpindleMemberNSID: 42 err = s.ingestMember(ctx, e) 43 case tangled.RepoNSID: 44 err = s.ingestRepo(ctx, e) 45 case tangled.RepoCollaboratorNSID: 46 err = s.ingestCollaborator(ctx, e) 47 } 48 49 if err != nil { 50 s.l.Debug("failed to process message", "nsid", e.Commit.Collection, "err", err) 51 } 52 53 return nil 54 } 55} 56 57func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error { 58 var err error 59 did := e.Did 60 rkey := e.Commit.RKey 61 62 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID) 63 64 switch e.Commit.Operation { 65 case models.CommitOperationCreate, models.CommitOperationUpdate: 66 raw := e.Commit.Record 67 record := tangled.SpindleMember{} 68 err = json.Unmarshal(raw, &record) 69 if err != nil { 70 l.Error("invalid record", "error", err) 71 return err 72 } 73 74 domain := s.cfg.Server.Hostname 75 recordInstance := record.Instance 76 77 if recordInstance != domain { 78 l.Error("domain mismatch", "domain", recordInstance, "expected", domain) 79 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain) 80 } 81 82 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain) 83 if err != nil || !ok { 84 l.Error("failed to add member", "did", did, "error", err) 85 return fmt.Errorf("failed to enforce permissions: %w", err) 86 } 87 88 if err := db.AddSpindleMember(s.db, db.SpindleMember{ 89 Did: syntax.DID(did), 90 Rkey: rkey, 91 Instance: recordInstance, 92 Subject: syntax.DID(record.Subject), 93 Created: time.Now(), 94 }); err != nil { 95 l.Error("failed to add member", "error", err) 96 return fmt.Errorf("failed to add member: %w", err) 97 } 98 99 if err := s.e.AddSpindleMember(rbacDomain, record.Subject); err != nil { 100 l.Error("failed to add member", "error", err) 101 return fmt.Errorf("failed to add member: %w", err) 102 } 103 l.Info("added member from firehose", "member", record.Subject) 104 105 if err := s.db.AddDid(record.Subject); err != nil { 106 l.Error("failed to add did", "error", err) 107 return fmt.Errorf("failed to add did: %w", err) 108 } 109 s.jc.AddDid(record.Subject) 110 111 return nil 112 113 case models.CommitOperationDelete: 114 record, err := db.GetSpindleMember(s.db, did, rkey) 115 if err != nil { 116 l.Error("failed to find member", "error", err) 117 return fmt.Errorf("failed to find member: %w", err) 118 } 119 120 if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { 121 l.Error("failed to remove member", "error", err) 122 return fmt.Errorf("failed to remove member: %w", err) 123 } 124 125 if err := s.e.RemoveSpindleMember(rbacDomain, record.Subject.String()); err != nil { 126 l.Error("failed to add member", "error", err) 127 return fmt.Errorf("failed to add member: %w", err) 128 } 129 l.Info("added member from firehose", "member", record.Subject) 130 131 if err := s.db.RemoveDid(record.Subject.String()); err != nil { 132 l.Error("failed to add did", "error", err) 133 return fmt.Errorf("failed to add did: %w", err) 134 } 135 s.jc.RemoveDid(record.Subject.String()) 136 137 } 138 return nil 139} 140 141func (s *Spindle) ingestRepo(ctx context.Context, e *models.Event) error { 142 var err error 143 did := e.Did 144 145 l := s.l.With("component", "ingester", "record", tangled.RepoNSID) 146 147 l.Info("ingesting repo record", "did", did) 148 149 switch e.Commit.Operation { 150 case models.CommitOperationCreate, models.CommitOperationUpdate: 151 raw := e.Commit.Record 152 record := tangled.Repo{} 153 err = json.Unmarshal(raw, &record) 154 if err != nil { 155 l.Error("invalid record", "error", err) 156 return err 157 } 158 159 domain := s.cfg.Server.Hostname 160 161 // no spindle configured for this repo 162 if record.Spindle == nil { 163 l.Info("no spindle configured", "name", record.Name) 164 return nil 165 } 166 167 // this repo did not want this spindle 168 if *record.Spindle != domain { 169 l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain) 170 return nil 171 } 172 173 // add this repo to the watch list 174 if err := s.db.AddRepo(record.Knot, did, record.Name); err != nil { 175 l.Error("failed to add repo", "error", err) 176 return fmt.Errorf("failed to add repo: %w", err) 177 } 178 179 didSlashRepo, err := securejoin.SecureJoin(did, record.Name) 180 if err != nil { 181 return err 182 } 183 184 // add repo to rbac 185 if err := s.e.AddRepo(did, rbac.ThisServer, didSlashRepo); err != nil { 186 l.Error("failed to add repo to enforcer", "error", err) 187 return fmt.Errorf("failed to add repo: %w", err) 188 } 189 190 // add collaborators to rbac 191 owner, err := s.res.ResolveIdent(ctx, did) 192 if err != nil || owner.Handle.IsInvalidHandle() { 193 return err 194 } 195 if err := s.fetchAndAddCollaborators(ctx, owner, didSlashRepo); err != nil { 196 return err 197 } 198 199 // add this knot to the event consumer 200 src := eventconsumer.NewKnotSource(record.Knot) 201 s.ks.AddSource(context.Background(), src) 202 203 return nil 204 205 } 206 return nil 207} 208 209func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error { 210 var err error 211 212 l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did) 213 214 l.Info("ingesting collaborator record") 215 216 switch e.Commit.Operation { 217 case models.CommitOperationCreate, models.CommitOperationUpdate: 218 raw := e.Commit.Record 219 record := tangled.RepoCollaborator{} 220 err = json.Unmarshal(raw, &record) 221 if err != nil { 222 l.Error("invalid record", "error", err) 223 return err 224 } 225 226 subjectId, err := s.res.ResolveIdent(ctx, record.Subject) 227 if err != nil || subjectId.Handle.IsInvalidHandle() { 228 return err 229 } 230 231 repoAt, err := syntax.ParseATURI(record.Repo) 232 if err != nil { 233 l.Info("rejecting record, invalid repoAt", "repoAt", record.Repo) 234 return nil 235 } 236 237 // TODO: get rid of this entirely 238 // resolve this aturi to extract the repo record 239 owner, err := s.res.ResolveIdent(ctx, repoAt.Authority().String()) 240 if err != nil || owner.Handle.IsInvalidHandle() { 241 return fmt.Errorf("failed to resolve handle: %w", err) 242 } 243 244 xrpcc := xrpc.Client{ 245 Host: owner.PDSEndpoint(), 246 } 247 248 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 249 if err != nil { 250 return err 251 } 252 253 repo := resp.Value.Val.(*tangled.Repo) 254 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 255 256 // check perms for this user 257 if ok, err := s.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil { 258 return fmt.Errorf("insufficient permissions: %w", err) 259 } 260 261 // add collaborator to rbac 262 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil { 263 l.Error("failed to add repo to enforcer", "error", err) 264 return fmt.Errorf("failed to add repo: %w", err) 265 } 266 267 return nil 268 } 269 return nil 270} 271 272func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, owner *identity.Identity, didSlashRepo string) error { 273 l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators") 274 275 l.Info("fetching and adding existing collaborators") 276 277 xrpcc := xrpc.Client{ 278 Host: owner.PDSEndpoint(), 279 } 280 281 resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, owner.DID.String(), false) 282 if err != nil { 283 return err 284 } 285 286 var errs error 287 for _, r := range resp.Records { 288 if r == nil { 289 continue 290 } 291 record := r.Value.Val.(*tangled.RepoCollaborator) 292 293 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil { 294 l.Error("failed to add repo to enforcer", "error", err) 295 errors.Join(errs, fmt.Errorf("failed to add repo: %w", err)) 296 } 297 } 298 299 return errs 300}