package knotserver import ( "context" "encoding/json" "fmt" "io" "net/http" "net/url" "path/filepath" "slices" "strings" comatproto "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/atproto/syntax" "github.com/bluesky-social/indigo/xrpc" "github.com/bluesky-social/jetstream/pkg/models" securejoin "github.com/cyphar/filepath-securejoin" "tangled.sh/tangled.sh/core/api/tangled" "tangled.sh/tangled.sh/core/idresolver" "tangled.sh/tangled.sh/core/knotserver/db" "tangled.sh/tangled.sh/core/knotserver/git" "tangled.sh/tangled.sh/core/log" "tangled.sh/tangled.sh/core/rbac" "tangled.sh/tangled.sh/core/workflow" ) func (h *Handle) processPublicKey(ctx context.Context, did string, operation string, record tangled.PublicKey) error { l := log.FromContext(ctx) switch operation { case models.CommitOperationCreate, models.CommitOperationUpdate: pk := db.PublicKey{ Did: did, PublicKey: record, } if err := h.db.AddPublicKey(pk); err != nil { l.Error("failed to add public key", "error", err) return fmt.Errorf("failed to add public key: %w", err) } l.Info("added public key from firehose", "did", did) case models.CommitOperationDelete: if err := h.db.RemovePublicKey(did); err != nil { l.Error("failed to remove public key", "error", err) return fmt.Errorf("failed to remove public key: %w", err) } l.Info("removed public key (delete triggered from firehose)", "did", did) } return nil } func (h *Handle) processKnotMember(ctx context.Context, did string, operation string, record tangled.KnotMember) error { l := log.FromContext(ctx) switch operation { case models.CommitOperationCreate, models.CommitOperationUpdate: if record.Domain != h.c.Server.Hostname { l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) } ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") if err != nil || !ok { l.Error("failed to add member", "did", did) return fmt.Errorf("failed to enforce permissions: %w", err) } if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil { l.Error("failed to add member", "error", err) return fmt.Errorf("failed to add member: %w", err) } l.Info("added member from firehose", "member", record.Subject) if err := h.db.AddDid(did); err != nil { l.Error("failed to add did", "error", err) return fmt.Errorf("failed to add did: %w", err) } h.jc.AddDid(did) if err := h.fetchAndAddKeys(ctx, did); err != nil { return fmt.Errorf("failed to fetch and add keys: %w", err) } case models.CommitOperationDelete: if err := h.e.RemoveKnotMember(rbac.ThisServer, record.Subject); err != nil { l.Error("failed to remove member", "error", err) return fmt.Errorf("failed to remove member: %w", err) } l.Info("removed member (delete triggered from firehose)", "member", record.Subject) if err := h.db.RemoveDid(record.Subject); err != nil { l.Error("failed to remove did", "error", err) return fmt.Errorf("failed to remove did: %w", err) } h.jc.RemoveDid(record.Subject) } return nil } func (h *Handle) processPull(ctx context.Context, did string, record tangled.RepoPull) error { l := log.FromContext(ctx) l = l.With("handler", "processPull") l = l.With("did", did) l = l.With("target_repo", record.TargetRepo) l = l.With("target_branch", record.TargetBranch) if record.Source == nil { reason := "not a branch-based pull request" l.Info("ignoring pull record", "reason", reason) return fmt.Errorf("ignoring pull record: %s", reason) } if record.Source.Repo != nil { reason := "fork based pull" l.Info("ignoring pull record", "reason", reason) return fmt.Errorf("ignoring pull record: %s", reason) } allDids, err := h.db.GetAllDids() if err != nil { return err } // presently: we only process PRs from collaborators for pipelines if !slices.Contains(allDids, did) { reason := "not a known did" l.Info("rejecting pull record", "reason", reason) return fmt.Errorf("rejected pull record: %s, %s", reason, did) } repoAt, err := syntax.ParseATURI(record.TargetRepo) if err != nil { return err } // resolve this aturi to extract the repo record resolver := idresolver.DefaultResolver() ident, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) if err != nil || ident.Handle.IsInvalidHandle() { return fmt.Errorf("failed to resolve handle: %w", err) } xrpcc := xrpc.Client{ Host: ident.PDSEndpoint(), } resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) if err != nil { return err } repo := resp.Value.Val.(*tangled.Repo) if repo.Knot != h.c.Server.Hostname { reason := "not this knot" l.Info("rejecting pull record", "reason", reason) return fmt.Errorf("rejected pull record: %s", reason) } didSlashRepo, err := securejoin.SecureJoin(repo.Owner, repo.Name) if err != nil { return err } repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo) if err != nil { return err } gr, err := git.Open(repoPath, record.Source.Branch) if err != nil { return err } workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) if err != nil { return err } var pipeline workflow.Pipeline for _, e := range workflowDir { if !e.IsFile { continue } fpath := filepath.Join(workflow.WorkflowDir, e.Name) contents, err := gr.RawContent(fpath) if err != nil { continue } wf, err := workflow.FromFile(e.Name, contents) if err != nil { // TODO: log here, respond to client that is pushing h.l.Error("failed to parse workflow", "err", err, "path", fpath) continue } pipeline = append(pipeline, wf) } trigger := tangled.Pipeline_PullRequestTriggerData{ Action: "create", SourceBranch: record.Source.Branch, SourceSha: record.Source.Sha, TargetBranch: record.TargetBranch, } compiler := workflow.Compiler{ Trigger: tangled.Pipeline_TriggerMetadata{ Kind: string(workflow.TriggerKindPullRequest), PullRequest: &trigger, Repo: &tangled.Pipeline_TriggerRepo{ Did: repo.Owner, Knot: repo.Knot, Repo: repo.Name, }, }, } cp := compiler.Compile(pipeline) eventJson, err := json.Marshal(cp) if err != nil { return err } // do not run empty pipelines if cp.Workflows == nil { return nil } event := db.Event{ Rkey: TID(), Nsid: tangled.PipelineNSID, EventJson: string(eventJson), } return h.db.InsertEvent(event, h.n) } // duplicated from add collaborator func (h *Handle) processCollaborator(ctx context.Context, did string, operation string, record tangled.RepoCollaborator) error { l := log.FromContext(ctx) l = l.With("handler", "processCollaborator", "did", did) switch operation { case models.CommitOperationCreate, models.CommitOperationUpdate: repoAt, err := syntax.ParseATURI(record.Repo) if err != nil { return err } resolver := h.resolver subjectId, err := resolver.ResolveIdent(ctx, record.Subject) if err != nil || subjectId.Handle.IsInvalidHandle() { return err } // TODO: fix this for good, we need to fetch the record here unfortunately // resolve this aturi to extract the repo record owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) if err != nil || owner.Handle.IsInvalidHandle() { return fmt.Errorf("failed to resolve handle: %w", err) } xrpcc := xrpc.Client{ Host: owner.PDSEndpoint(), } resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) if err != nil { return err } repo := resp.Value.Val.(*tangled.Repo) didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) // check perms for this user if ok, err := h.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil { return fmt.Errorf("insufficient permissions: %w", err) } if err := h.db.AddDid(subjectId.DID.String()); err != nil { return err } h.jc.AddDid(subjectId.DID.String()) if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil { return err } l.Info("added collaborator from firehose", "subject", record.Subject, "repo", record.Repo) return h.fetchAndAddKeys(ctx, subjectId.DID.String()) case models.CommitOperationDelete: repoAt, err := syntax.ParseATURI(record.Repo) if err != nil { return err } resolver := h.resolver subjectId, err := resolver.ResolveIdent(ctx, record.Subject) if err != nil || subjectId.Handle.IsInvalidHandle() { return err } owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) if err != nil || owner.Handle.IsInvalidHandle() { return fmt.Errorf("failed to resolve handle: %w", err) } xrpcc := xrpc.Client{ Host: owner.PDSEndpoint(), } resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) if err != nil { return err } repo := resp.Value.Val.(*tangled.Repo) didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) if err := h.e.RemoveCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil { l.Error("failed to remove collaborator", "error", err) return fmt.Errorf("failed to remove collaborator: %w", err) } l.Info("removed collaborator from firehose", "subject", record.Subject, "repo", record.Repo) } return nil } func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error { l := log.FromContext(ctx) keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) if err != nil { l.Error("error building endpoint url", "did", did, "error", err.Error()) return fmt.Errorf("error building endpoint url: %w", err) } resp, err := http.Get(keysEndpoint) if err != nil { l.Error("error getting keys", "did", did, "error", err) return fmt.Errorf("error getting keys: %w", err) } defer resp.Body.Close() if resp.StatusCode == http.StatusNotFound { l.Info("no keys found for did", "did", did) return nil } plaintext, err := io.ReadAll(resp.Body) if err != nil { l.Error("error reading response body", "error", err) return fmt.Errorf("error reading response body: %w", err) } for _, key := range strings.Split(string(plaintext), "\n") { if key == "" { continue } pk := db.PublicKey{ Did: did, } pk.Key = key if err := h.db.AddPublicKey(pk); err != nil { l.Error("failed to add public key", "error", err) return fmt.Errorf("failed to add public key: %w", err) } } return nil } func (h *Handle) processMessages(ctx context.Context, event *models.Event) error { did := event.Did if event.Kind != models.EventKindCommit { return nil } var err error defer func() { eventTime := event.TimeUS lastTimeUs := eventTime + 1 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { err = fmt.Errorf("(deferred) failed to save last time us: %w", err) } }() raw := json.RawMessage(event.Commit.Record) switch event.Commit.Collection { case tangled.PublicKeyNSID: var record tangled.PublicKey if err := json.Unmarshal(raw, &record); err != nil { return fmt.Errorf("failed to unmarshal record: %w", err) } if err := h.processPublicKey(ctx, did, event.Commit.Operation, record); err != nil { return fmt.Errorf("failed to process public key: %w", err) } case tangled.KnotMemberNSID: var record tangled.KnotMember if err := json.Unmarshal(raw, &record); err != nil { return fmt.Errorf("failed to unmarshal record: %w", err) } if err := h.processKnotMember(ctx, did, event.Commit.Operation, record); err != nil { return fmt.Errorf("failed to process knot member: %w", err) } case tangled.RepoPullNSID: var record tangled.RepoPull if err := json.Unmarshal(raw, &record); err != nil { return fmt.Errorf("failed to unmarshal record: %w", err) } if err := h.processPull(ctx, did, record); err != nil { return fmt.Errorf("failed to process knot member: %w", err) } case tangled.RepoCollaboratorNSID: var record tangled.RepoCollaborator if err := json.Unmarshal(raw, &record); err != nil { return fmt.Errorf("failed to unmarshal record: %w", err) } if err := h.processCollaborator(ctx, did, event.Commit.Operation, record); err != nil { return fmt.Errorf("failed to process collaborator: %w", err) } } return err }