From 5f5fa5bfad0fbfc31b18ede35e7685c02d796bd6 Mon Sep 17 00:00:00 2001 From: oppiliappan Date: Mon, 4 Aug 2025 08:50:41 +0100 Subject: [PATCH] knotserver,spindle: ingest collaborator record and update rbac Change-Id: ksqputkprxkuqyxuwqyzrnnzlpkqrqxx Signed-off-by: oppiliappan --- jetstream/jetstream.go | 13 ++++++ knotserver/ingester.go | 61 ++++++++++++++++++++++++ knotserver/server.go | 1 + spindle/ingester.go | 103 +++++++++++++++++++++++++++++++++++++++++ spindle/server.go | 1 + 5 files changed, 179 insertions(+) diff --git a/jetstream/jetstream.go b/jetstream/jetstream.go index be6c048..fd4506e 100644 --- a/jetstream/jetstream.go +++ b/jetstream/jetstream.go @@ -52,6 +52,19 @@ func (j *JetstreamClient) AddDid(did string) { j.mu.Unlock() } +func (j *JetstreamClient) RemoveDid(did string) { + if did == "" { + return + } + + if j.logDids { + j.l.Info("removing did from in-memory filter", "did", did) + } + j.mu.Lock() + delete(j.wantedDids, did) + j.mu.Unlock() +} + type processor func(context.Context, *models.Event) error func (j *JetstreamClient) withDidFilter(processFunc processor) processor { diff --git a/knotserver/ingester.go b/knotserver/ingester.go index abbd4e5..25afba5 100644 --- a/knotserver/ingester.go +++ b/knotserver/ingester.go @@ -213,6 +213,56 @@ func (h *Handle) processPull(ctx context.Context, did string, record tangled.Rep return h.db.InsertEvent(event, h.n) } +// duplicated from add collaborator +func (h *Handle) processCollaborator(ctx context.Context, did string, record tangled.RepoCollaborator) error { + repoAt, err := syntax.ParseATURI(record.Repo) + if err != nil { + return err + } + + resolver := idresolver.DefaultResolver() + + subjectId, err := resolver.ResolveIdent(ctx, record.Subject) + if err != nil || subjectId.Handle.IsInvalidHandle() { + return err + } + + // TODO: fix this for good, we need to fetch the record here unfortunately + // resolve this aturi to extract the repo record + owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) + if err != nil || owner.Handle.IsInvalidHandle() { + return fmt.Errorf("failed to resolve handle: %w", err) + } + + xrpcc := xrpc.Client{ + Host: owner.PDSEndpoint(), + } + + resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) + if err != nil { + return err + } + + repo := resp.Value.Val.(*tangled.Repo) + didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) + + // check perms for this user + if ok, err := h.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil { + return fmt.Errorf("insufficient permissions: %w", err) + } + + if err := h.db.AddDid(subjectId.DID.String()); err != nil { + return err + } + h.jc.AddDid(subjectId.DID.String()) + + if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil { + return err + } + + return h.fetchAndAddKeys(ctx, subjectId.DID.String()) +} + func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error { l := log.FromContext(ctx) @@ -292,6 +342,7 @@ func (h *Handle) processMessages(ctx context.Context, event *models.Event) error if err := h.processKnotMember(ctx, did, record); err != nil { return fmt.Errorf("failed to process knot member: %w", err) } + case tangled.RepoPullNSID: var record tangled.RepoPull if err := json.Unmarshal(raw, &record); err != nil { @@ -300,6 +351,16 @@ func (h *Handle) processMessages(ctx context.Context, event *models.Event) error if err := h.processPull(ctx, did, record); err != nil { return fmt.Errorf("failed to process knot member: %w", err) } + + case tangled.RepoCollaboratorNSID: + var record tangled.RepoCollaborator + if err := json.Unmarshal(raw, &record); err != nil { + return fmt.Errorf("failed to unmarshal record: %w", err) + } + if err := h.processCollaborator(ctx, did, record); err != nil { + return fmt.Errorf("failed to process knot member: %w", err) + } + } return err diff --git a/knotserver/server.go b/knotserver/server.go index 575795a..b23a159 100644 --- a/knotserver/server.go +++ b/knotserver/server.go @@ -76,6 +76,7 @@ func Run(ctx context.Context, cmd *cli.Command) error { tangled.PublicKeyNSID, tangled.KnotMemberNSID, tangled.RepoPullNSID, + tangled.RepoCollaboratorNSID, }, nil, logger, db, true, c.Server.LogDids) if err != nil { logger.Error("failed to setup jetstream", "error", err) diff --git a/spindle/ingester.go b/spindle/ingester.go index 7a8dfb2..51db36d 100644 --- a/spindle/ingester.go +++ b/spindle/ingester.go @@ -8,9 +8,15 @@ import ( "tangled.sh/tangled.sh/core/api/tangled" "tangled.sh/tangled.sh/core/eventconsumer" + "tangled.sh/tangled.sh/core/idresolver" "tangled.sh/tangled.sh/core/rbac" + comatproto "github.com/bluesky-social/indigo/api/atproto" + "github.com/bluesky-social/indigo/atproto/identity" + "github.com/bluesky-social/indigo/atproto/syntax" + "github.com/bluesky-social/indigo/xrpc" "github.com/bluesky-social/jetstream/pkg/models" + securejoin "github.com/cyphar/filepath-securejoin" ) type Ingester func(ctx context.Context, e *models.Event) error @@ -35,6 +41,8 @@ func (s *Spindle) ingest() Ingester { s.ingestMember(ctx, e) case tangled.RepoNSID: s.ingestRepo(ctx, e) + case tangled.RepoCollaboratorNSID: + s.ingestCollaborator(ctx, e) } return err @@ -144,3 +152,98 @@ func (s *Spindle) ingestRepo(_ context.Context, e *models.Event) error { } return nil } + +func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error { + var err error + + l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did) + + l.Info("ingesting collaborator record") + + switch e.Commit.Operation { + case models.CommitOperationCreate, models.CommitOperationUpdate: + raw := e.Commit.Record + record := tangled.RepoCollaborator{} + err = json.Unmarshal(raw, &record) + if err != nil { + l.Error("invalid record", "error", err) + return err + } + + resolver := idresolver.DefaultResolver() + + subjectId, err := resolver.ResolveIdent(ctx, record.Subject) + if err != nil || subjectId.Handle.IsInvalidHandle() { + return err + } + + repoAt, err := syntax.ParseATURI(record.Repo) + if err != nil { + l.Info("rejecting record, invalid repoAt", "repoAt", record.Repo) + return nil + } + + // TODO: get rid of this entirely + // resolve this aturi to extract the repo record + owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) + if err != nil || owner.Handle.IsInvalidHandle() { + return fmt.Errorf("failed to resolve handle: %w", err) + } + + xrpcc := xrpc.Client{ + Host: owner.PDSEndpoint(), + } + + resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) + if err != nil { + return err + } + + repo := resp.Value.Val.(*tangled.Repo) + didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) + + // check perms for this user + if ok, err := s.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil { + return fmt.Errorf("insufficient permissions: %w", err) + } + + // add collaborator to rbac + if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil { + l.Error("failed to add repo to enforcer", "error", err) + return fmt.Errorf("failed to add repo: %w", err) + } + + return nil + } + return nil +} + +func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, owner *identity.Identity, didSlashRepo string) error { + l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators") + + l.Info("fetching and adding existing collaborators") + + xrpcc := xrpc.Client{ + Host: owner.PDSEndpoint(), + } + + resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, owner.DID.String(), false) + if err != nil { + return err + } + + var errs error + for _, r := range resp.Records { + if r == nil { + continue + } + record := r.Value.Val.(*tangled.RepoCollaborator) + + if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil { + l.Error("failed to add repo to enforcer", "error", err) + errors.Join(errs, fmt.Errorf("failed to add repo: %w", err)) + } + } + + return errs +} diff --git a/spindle/server.go b/spindle/server.go index 5ae5444..f414117 100644 --- a/spindle/server.go +++ b/spindle/server.go @@ -111,6 +111,7 @@ func Run(ctx context.Context) error { collections := []string{ tangled.SpindleMemberNSID, tangled.RepoNSID, + tangled.RepoCollaboratorNSID, } jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true) if err != nil { -- 2.43.0