forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "path/filepath" 11 "strings" 12 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/bluesky-social/indigo/xrpc" 16 "github.com/bluesky-social/jetstream/pkg/models" 17 securejoin "github.com/cyphar/filepath-securejoin" 18 "tangled.org/core/api/tangled" 19 "tangled.org/core/knotserver/db" 20 "tangled.org/core/knotserver/git" 21 "tangled.org/core/log" 22 "tangled.org/core/rbac" 23 "tangled.org/core/workflow" 24) 25 26func (h *Knot) processPublicKey(ctx context.Context, event *models.Event) error { 27 l := log.FromContext(ctx) 28 raw := json.RawMessage(event.Commit.Record) 29 did := event.Did 30 31 var record tangled.PublicKey 32 if err := json.Unmarshal(raw, &record); err != nil { 33 return fmt.Errorf("failed to unmarshal record: %w", err) 34 } 35 36 pk := db.PublicKey{ 37 Did: did, 38 PublicKey: record, 39 } 40 if err := h.db.AddPublicKey(pk); err != nil { 41 l.Error("failed to add public key", "error", err) 42 return fmt.Errorf("failed to add public key: %w", err) 43 } 44 l.Info("added public key from firehose", "did", did) 45 return nil 46} 47 48func (h *Knot) processKnotMember(ctx context.Context, event *models.Event) error { 49 l := log.FromContext(ctx) 50 raw := json.RawMessage(event.Commit.Record) 51 did := event.Did 52 53 var record tangled.KnotMember 54 if err := json.Unmarshal(raw, &record); err != nil { 55 return fmt.Errorf("failed to unmarshal record: %w", err) 56 } 57 58 if record.Domain != h.c.Server.Hostname { 59 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 60 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 61 } 62 63 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 64 if err != nil || !ok { 65 l.Error("failed to add member", "did", did) 66 return fmt.Errorf("failed to enforce permissions: %w", err) 67 } 68 69 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil { 70 l.Error("failed to add member", "error", err) 71 return fmt.Errorf("failed to add member: %w", err) 72 } 73 l.Info("added member from firehose", "member", record.Subject) 74 75 if err := h.db.AddDid(record.Subject); err != nil { 76 l.Error("failed to add did", "error", err) 77 return fmt.Errorf("failed to add did: %w", err) 78 } 79 h.jc.AddDid(record.Subject) 80 81 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil { 82 return fmt.Errorf("failed to fetch and add keys: %w", err) 83 } 84 85 return nil 86} 87 88func (h *Knot) processPull(ctx context.Context, event *models.Event) error { 89 raw := json.RawMessage(event.Commit.Record) 90 did := event.Did 91 92 var record tangled.RepoPull 93 if err := json.Unmarshal(raw, &record); err != nil { 94 return fmt.Errorf("failed to unmarshal record: %w", err) 95 } 96 97 l := log.FromContext(ctx) 98 l = l.With("handler", "processPull") 99 l = l.With("did", did) 100 101 if record.Target == nil { 102 return fmt.Errorf("ignoring pull record: target repo is nil") 103 } 104 105 l = l.With("target_repo", record.Target.Repo) 106 l = l.With("target_branch", record.Target.Branch) 107 108 if record.Source == nil { 109 return fmt.Errorf("ignoring pull record: not a branch-based pull request") 110 } 111 112 if record.Source.Repo != nil { 113 return fmt.Errorf("ignoring pull record: fork based pull") 114 } 115 116 repoAt, err := syntax.ParseATURI(record.Target.Repo) 117 if err != nil { 118 return fmt.Errorf("failed to parse ATURI: %w", err) 119 } 120 121 // resolve this aturi to extract the repo record 122 ident, err := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 123 if err != nil || ident.Handle.IsInvalidHandle() { 124 return fmt.Errorf("failed to resolve handle: %w", err) 125 } 126 127 xrpcc := xrpc.Client{ 128 Host: ident.PDSEndpoint(), 129 } 130 131 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 132 if err != nil { 133 return fmt.Errorf("failed to resolver repo: %w", err) 134 } 135 136 repo := resp.Value.Val.(*tangled.Repo) 137 138 if repo.Knot != h.c.Server.Hostname { 139 return fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname) 140 } 141 142 didSlashRepo, err := securejoin.SecureJoin(ident.DID.String(), repo.Name) 143 if err != nil { 144 return fmt.Errorf("failed to construct relative repo path: %w", err) 145 } 146 147 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo) 148 if err != nil { 149 return fmt.Errorf("failed to construct absolute repo path: %w", err) 150 } 151 152 gr, err := git.Open(repoPath, record.Source.Sha) 153 if err != nil { 154 return fmt.Errorf("failed to open git repository: %w", err) 155 } 156 157 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 158 if err != nil { 159 return fmt.Errorf("failed to open workflow directory: %w", err) 160 } 161 162 var pipeline workflow.RawPipeline 163 for _, e := range workflowDir { 164 if !e.IsFile() { 165 continue 166 } 167 168 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 169 contents, err := gr.RawContent(fpath) 170 if err != nil { 171 continue 172 } 173 174 pipeline = append(pipeline, workflow.RawWorkflow{ 175 Name: e.Name, 176 Contents: contents, 177 }) 178 } 179 180 trigger := tangled.Pipeline_PullRequestTriggerData{ 181 Action: "create", 182 SourceBranch: record.Source.Branch, 183 SourceSha: record.Source.Sha, 184 TargetBranch: record.Target.Branch, 185 } 186 187 compiler := workflow.Compiler{ 188 Trigger: tangled.Pipeline_TriggerMetadata{ 189 Kind: string(workflow.TriggerKindPullRequest), 190 PullRequest: &trigger, 191 Repo: &tangled.Pipeline_TriggerRepo{ 192 Did: ident.DID.String(), 193 Knot: repo.Knot, 194 Repo: repo.Name, 195 }, 196 }, 197 } 198 199 cp := compiler.Compile(compiler.Parse(pipeline)) 200 eventJson, err := json.Marshal(cp) 201 if err != nil { 202 return fmt.Errorf("failed to marshal pipeline event: %w", err) 203 } 204 205 // do not run empty pipelines 206 if cp.Workflows == nil { 207 return nil 208 } 209 210 ev := db.Event{ 211 Rkey: TID(), 212 Nsid: tangled.PipelineNSID, 213 EventJson: string(eventJson), 214 } 215 216 return h.db.InsertEvent(ev, h.n) 217} 218 219// duplicated from add collaborator 220func (h *Knot) processCollaborator(ctx context.Context, event *models.Event) error { 221 raw := json.RawMessage(event.Commit.Record) 222 did := event.Did 223 224 var record tangled.RepoCollaborator 225 if err := json.Unmarshal(raw, &record); err != nil { 226 return fmt.Errorf("failed to unmarshal record: %w", err) 227 } 228 229 repoAt, err := syntax.ParseATURI(record.Repo) 230 if err != nil { 231 return err 232 } 233 234 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject) 235 if err != nil || subjectId.Handle.IsInvalidHandle() { 236 return err 237 } 238 239 // TODO: fix this for good, we need to fetch the record here unfortunately 240 // resolve this aturi to extract the repo record 241 owner, err := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 242 if err != nil || owner.Handle.IsInvalidHandle() { 243 return fmt.Errorf("failed to resolve handle: %w", err) 244 } 245 246 xrpcc := xrpc.Client{ 247 Host: owner.PDSEndpoint(), 248 } 249 250 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 251 if err != nil { 252 return err 253 } 254 255 repo := resp.Value.Val.(*tangled.Repo) 256 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 257 258 // check perms for this user 259 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo) 260 if err != nil { 261 return fmt.Errorf("failed to check permissions: %w", err) 262 } 263 if !ok { 264 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", didSlashRepo) 265 } 266 267 if err := h.db.AddDid(subjectId.DID.String()); err != nil { 268 return err 269 } 270 h.jc.AddDid(subjectId.DID.String()) 271 272 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil { 273 return err 274 } 275 276 return h.fetchAndAddKeys(ctx, subjectId.DID.String()) 277} 278 279func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error { 280 l := log.FromContext(ctx) 281 282 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 283 if err != nil { 284 l.Error("error building endpoint url", "did", did, "error", err.Error()) 285 return fmt.Errorf("error building endpoint url: %w", err) 286 } 287 288 resp, err := http.Get(keysEndpoint) 289 if err != nil { 290 l.Error("error getting keys", "did", did, "error", err) 291 return fmt.Errorf("error getting keys: %w", err) 292 } 293 defer resp.Body.Close() 294 295 if resp.StatusCode == http.StatusNotFound { 296 l.Info("no keys found for did", "did", did) 297 return nil 298 } 299 300 plaintext, err := io.ReadAll(resp.Body) 301 if err != nil { 302 l.Error("error reading response body", "error", err) 303 return fmt.Errorf("error reading response body: %w", err) 304 } 305 306 for key := range strings.SplitSeq(string(plaintext), "\n") { 307 if key == "" { 308 continue 309 } 310 pk := db.PublicKey{ 311 Did: did, 312 } 313 pk.Key = key 314 if err := h.db.AddPublicKey(pk); err != nil { 315 l.Error("failed to add public key", "error", err) 316 return fmt.Errorf("failed to add public key: %w", err) 317 } 318 } 319 return nil 320} 321 322func (h *Knot) processMessages(ctx context.Context, event *models.Event) error { 323 if event.Kind != models.EventKindCommit { 324 return nil 325 } 326 327 var err error 328 defer func() { 329 eventTime := event.TimeUS 330 lastTimeUs := eventTime + 1 331 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 332 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 333 } 334 }() 335 336 switch event.Commit.Collection { 337 case tangled.PublicKeyNSID: 338 err = h.processPublicKey(ctx, event) 339 case tangled.KnotMemberNSID: 340 err = h.processKnotMember(ctx, event) 341 case tangled.RepoPullNSID: 342 err = h.processPull(ctx, event) 343 case tangled.RepoCollaboratorNSID: 344 err = h.processCollaborator(ctx, event) 345 } 346 347 if err != nil { 348 h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err) 349 } 350 351 return nil 352}