knotserver,spindle: ingest collaborator record and update rbac #398

merged
opened by oppi.li targeting master from push-vuzywsvmkwqn
Changed files
+179
jetstream
knotserver
spindle
+13
jetstream/jetstream.go
···
j.mu.Unlock()
}
+
func (j *JetstreamClient) RemoveDid(did string) {
+
if did == "" {
+
return
+
}
+
+
if j.logDids {
+
j.l.Info("removing did from in-memory filter", "did", did)
+
}
+
j.mu.Lock()
+
delete(j.wantedDids, did)
+
j.mu.Unlock()
+
}
+
type processor func(context.Context, *models.Event) error
func (j *JetstreamClient) withDidFilter(processFunc processor) processor {
+61
knotserver/ingester.go
···
return h.db.InsertEvent(event, h.n)
}
+
// duplicated from add collaborator
+
func (h *Handle) processCollaborator(ctx context.Context, did string, record tangled.RepoCollaborator) error {
+
repoAt, err := syntax.ParseATURI(record.Repo)
+
if err != nil {
+
return err
+
}
+
+
resolver := idresolver.DefaultResolver()
+
+
subjectId, err := resolver.ResolveIdent(ctx, record.Subject)
+
if err != nil || subjectId.Handle.IsInvalidHandle() {
+
return err
+
}
+
+
// TODO: fix this for good, we need to fetch the record here unfortunately
+
// resolve this aturi to extract the repo record
+
owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
+
if err != nil || owner.Handle.IsInvalidHandle() {
+
return fmt.Errorf("failed to resolve handle: %w", err)
+
}
+
+
xrpcc := xrpc.Client{
+
Host: owner.PDSEndpoint(),
+
}
+
+
resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
+
if err != nil {
+
return err
+
}
+
+
repo := resp.Value.Val.(*tangled.Repo)
+
didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
+
+
// check perms for this user
+
if ok, err := h.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil {
+
return fmt.Errorf("insufficient permissions: %w", err)
+
}
+
+
if err := h.db.AddDid(subjectId.DID.String()); err != nil {
+
return err
+
}
+
h.jc.AddDid(subjectId.DID.String())
+
+
if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil {
+
return err
+
}
+
+
return h.fetchAndAddKeys(ctx, subjectId.DID.String())
+
}
+
func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
l := log.FromContext(ctx)
···
if err := h.processKnotMember(ctx, did, record); err != nil {
return fmt.Errorf("failed to process knot member: %w", err)
}
+
case tangled.RepoPullNSID:
var record tangled.RepoPull
if err := json.Unmarshal(raw, &record); err != nil {
···
if err := h.processPull(ctx, did, record); err != nil {
return fmt.Errorf("failed to process knot member: %w", err)
}
+
+
case tangled.RepoCollaboratorNSID:
+
var record tangled.RepoCollaborator
+
if err := json.Unmarshal(raw, &record); err != nil {
+
return fmt.Errorf("failed to unmarshal record: %w", err)
+
}
+
if err := h.processCollaborator(ctx, did, record); err != nil {
+
return fmt.Errorf("failed to process knot member: %w", err)
+
}
+
}
return err
+1
knotserver/server.go
···
tangled.PublicKeyNSID,
tangled.KnotMemberNSID,
tangled.RepoPullNSID,
+
tangled.RepoCollaboratorNSID,
}, nil, logger, db, true, c.Server.LogDids)
if err != nil {
logger.Error("failed to setup jetstream", "error", err)
+103
spindle/ingester.go
···
"tangled.sh/tangled.sh/core/api/tangled"
"tangled.sh/tangled.sh/core/eventconsumer"
+
"tangled.sh/tangled.sh/core/idresolver"
"tangled.sh/tangled.sh/core/rbac"
+
comatproto "github.com/bluesky-social/indigo/api/atproto"
+
"github.com/bluesky-social/indigo/atproto/identity"
+
"github.com/bluesky-social/indigo/atproto/syntax"
+
"github.com/bluesky-social/indigo/xrpc"
"github.com/bluesky-social/jetstream/pkg/models"
+
securejoin "github.com/cyphar/filepath-securejoin"
)
type Ingester func(ctx context.Context, e *models.Event) error
···
s.ingestMember(ctx, e)
case tangled.RepoNSID:
s.ingestRepo(ctx, e)
+
case tangled.RepoCollaboratorNSID:
+
s.ingestCollaborator(ctx, e)
}
return err
···
}
return nil
}
+
+
func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error {
+
var err error
+
+
l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did)
+
+
l.Info("ingesting collaborator record")
+
+
switch e.Commit.Operation {
+
case models.CommitOperationCreate, models.CommitOperationUpdate:
+
raw := e.Commit.Record
+
record := tangled.RepoCollaborator{}
+
err = json.Unmarshal(raw, &record)
+
if err != nil {
+
l.Error("invalid record", "error", err)
+
return err
+
}
+
+
resolver := idresolver.DefaultResolver()
+
+
subjectId, err := resolver.ResolveIdent(ctx, record.Subject)
+
if err != nil || subjectId.Handle.IsInvalidHandle() {
+
return err
+
}
+
+
repoAt, err := syntax.ParseATURI(record.Repo)
+
if err != nil {
+
l.Info("rejecting record, invalid repoAt", "repoAt", record.Repo)
+
return nil
+
}
+
+
// TODO: get rid of this entirely
+
// resolve this aturi to extract the repo record
+
owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
+
if err != nil || owner.Handle.IsInvalidHandle() {
+
return fmt.Errorf("failed to resolve handle: %w", err)
+
}
+
+
xrpcc := xrpc.Client{
+
Host: owner.PDSEndpoint(),
+
}
+
+
resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
+
if err != nil {
+
return err
+
}
+
+
repo := resp.Value.Val.(*tangled.Repo)
+
didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
+
+
// check perms for this user
+
if ok, err := s.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil {
+
return fmt.Errorf("insufficient permissions: %w", err)
+
}
+
+
// add collaborator to rbac
+
if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil {
+
l.Error("failed to add repo to enforcer", "error", err)
+
return fmt.Errorf("failed to add repo: %w", err)
+
}
+
+
return nil
+
}
+
return nil
+
}
+
+
func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, owner *identity.Identity, didSlashRepo string) error {
+
l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators")
+
+
l.Info("fetching and adding existing collaborators")
+
+
xrpcc := xrpc.Client{
+
Host: owner.PDSEndpoint(),
+
}
+
+
resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, owner.DID.String(), false)
+
if err != nil {
+
return err
+
}
+
+
var errs error
+
for _, r := range resp.Records {
+
if r == nil {
+
continue
+
}
+
record := r.Value.Val.(*tangled.RepoCollaborator)
+
+
if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil {
+
l.Error("failed to add repo to enforcer", "error", err)
+
errors.Join(errs, fmt.Errorf("failed to add repo: %w", err))
+
}
+
}
+
+
return errs
+
}
+1
spindle/server.go
···
collections := []string{
tangled.SpindleMemberNSID,
tangled.RepoNSID,
+
tangled.RepoCollaboratorNSID,
}
jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true)
if err != nil {