forked from tangled.org/core
this repo has no description
at knot-xrpc 13 kB view raw
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "path/filepath" 11 "slices" 12 "strings" 13 14 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 "github.com/bluesky-social/indigo/xrpc" 17 "github.com/bluesky-social/jetstream/pkg/models" 18 securejoin "github.com/cyphar/filepath-securejoin" 19 "tangled.sh/tangled.sh/core/api/tangled" 20 "tangled.sh/tangled.sh/core/idresolver" 21 "tangled.sh/tangled.sh/core/knotserver/db" 22 "tangled.sh/tangled.sh/core/knotserver/git" 23 "tangled.sh/tangled.sh/core/log" 24 "tangled.sh/tangled.sh/core/rbac" 25 "tangled.sh/tangled.sh/core/workflow" 26) 27 28func (h *Handle) processPublicKey(ctx context.Context, did string, operation string, record tangled.PublicKey) error { 29 l := log.FromContext(ctx) 30 31 switch operation { 32 case models.CommitOperationCreate, models.CommitOperationUpdate: 33 pk := db.PublicKey{ 34 Did: did, 35 PublicKey: record, 36 } 37 if err := h.db.AddPublicKey(pk); err != nil { 38 l.Error("failed to add public key", "error", err) 39 return fmt.Errorf("failed to add public key: %w", err) 40 } 41 l.Info("added public key from firehose", "did", did) 42 43 case models.CommitOperationDelete: 44 if err := h.db.RemovePublicKey(did); err != nil { 45 l.Error("failed to remove public key", "error", err) 46 return fmt.Errorf("failed to remove public key: %w", err) 47 } 48 l.Info("removed public key (delete triggered from firehose)", "did", did) 49 } 50 51 return nil 52} 53 54func (h *Handle) processKnotMember(ctx context.Context, did string, operation string, record tangled.KnotMember) error { 55 l := log.FromContext(ctx) 56 57 switch operation { 58 case models.CommitOperationCreate, models.CommitOperationUpdate: 59 if record.Domain != h.c.Server.Hostname { 60 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 61 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 62 } 63 64 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 65 if err != nil || !ok { 66 l.Error("failed to add member", "did", did) 67 return fmt.Errorf("failed to enforce permissions: %w", err) 68 } 69 70 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil { 71 l.Error("failed to add member", "error", err) 72 return fmt.Errorf("failed to add member: %w", err) 73 } 74 l.Info("added member from firehose", "member", record.Subject) 75 76 if err := h.db.AddDid(did); err != nil { 77 l.Error("failed to add did", "error", err) 78 return fmt.Errorf("failed to add did: %w", err) 79 } 80 h.jc.AddDid(did) 81 82 if err := h.fetchAndAddKeys(ctx, did); err != nil { 83 return fmt.Errorf("failed to fetch and add keys: %w", err) 84 } 85 86 case models.CommitOperationDelete: 87 if err := h.e.RemoveKnotMember(rbac.ThisServer, record.Subject); err != nil { 88 l.Error("failed to remove member", "error", err) 89 return fmt.Errorf("failed to remove member: %w", err) 90 } 91 l.Info("removed member (delete triggered from firehose)", "member", record.Subject) 92 93 if err := h.db.RemoveDid(record.Subject); err != nil { 94 l.Error("failed to remove did", "error", err) 95 return fmt.Errorf("failed to remove did: %w", err) 96 } 97 h.jc.RemoveDid(record.Subject) 98 } 99 100 return nil 101} 102 103func (h *Handle) processPull(ctx context.Context, did string, record tangled.RepoPull) error { 104 l := log.FromContext(ctx) 105 l = l.With("handler", "processPull") 106 l = l.With("did", did) 107 l = l.With("target_repo", record.TargetRepo) 108 l = l.With("target_branch", record.TargetBranch) 109 110 if record.Source == nil { 111 reason := "not a branch-based pull request" 112 l.Info("ignoring pull record", "reason", reason) 113 return fmt.Errorf("ignoring pull record: %s", reason) 114 } 115 116 if record.Source.Repo != nil { 117 reason := "fork based pull" 118 l.Info("ignoring pull record", "reason", reason) 119 return fmt.Errorf("ignoring pull record: %s", reason) 120 } 121 122 allDids, err := h.db.GetAllDids() 123 if err != nil { 124 return err 125 } 126 127 // presently: we only process PRs from collaborators for pipelines 128 if !slices.Contains(allDids, did) { 129 reason := "not a known did" 130 l.Info("rejecting pull record", "reason", reason) 131 return fmt.Errorf("rejected pull record: %s, %s", reason, did) 132 } 133 134 repoAt, err := syntax.ParseATURI(record.TargetRepo) 135 if err != nil { 136 return err 137 } 138 139 // resolve this aturi to extract the repo record 140 resolver := idresolver.DefaultResolver() 141 ident, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 142 if err != nil || ident.Handle.IsInvalidHandle() { 143 return fmt.Errorf("failed to resolve handle: %w", err) 144 } 145 146 xrpcc := xrpc.Client{ 147 Host: ident.PDSEndpoint(), 148 } 149 150 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 151 if err != nil { 152 return err 153 } 154 155 repo := resp.Value.Val.(*tangled.Repo) 156 157 if repo.Knot != h.c.Server.Hostname { 158 reason := "not this knot" 159 l.Info("rejecting pull record", "reason", reason) 160 return fmt.Errorf("rejected pull record: %s", reason) 161 } 162 163 didSlashRepo, err := securejoin.SecureJoin(repo.Owner, repo.Name) 164 if err != nil { 165 return err 166 } 167 168 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo) 169 if err != nil { 170 return err 171 } 172 173 gr, err := git.Open(repoPath, record.Source.Branch) 174 if err != nil { 175 return err 176 } 177 178 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 179 if err != nil { 180 return err 181 } 182 183 var pipeline workflow.Pipeline 184 for _, e := range workflowDir { 185 if !e.IsFile { 186 continue 187 } 188 189 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 190 contents, err := gr.RawContent(fpath) 191 if err != nil { 192 continue 193 } 194 195 wf, err := workflow.FromFile(e.Name, contents) 196 if err != nil { 197 // TODO: log here, respond to client that is pushing 198 h.l.Error("failed to parse workflow", "err", err, "path", fpath) 199 continue 200 } 201 202 pipeline = append(pipeline, wf) 203 } 204 205 trigger := tangled.Pipeline_PullRequestTriggerData{ 206 Action: "create", 207 SourceBranch: record.Source.Branch, 208 SourceSha: record.Source.Sha, 209 TargetBranch: record.TargetBranch, 210 } 211 212 compiler := workflow.Compiler{ 213 Trigger: tangled.Pipeline_TriggerMetadata{ 214 Kind: string(workflow.TriggerKindPullRequest), 215 PullRequest: &trigger, 216 Repo: &tangled.Pipeline_TriggerRepo{ 217 Did: repo.Owner, 218 Knot: repo.Knot, 219 Repo: repo.Name, 220 }, 221 }, 222 } 223 224 cp := compiler.Compile(pipeline) 225 eventJson, err := json.Marshal(cp) 226 if err != nil { 227 return err 228 } 229 230 // do not run empty pipelines 231 if cp.Workflows == nil { 232 return nil 233 } 234 235 event := db.Event{ 236 Rkey: TID(), 237 Nsid: tangled.PipelineNSID, 238 EventJson: string(eventJson), 239 } 240 241 return h.db.InsertEvent(event, h.n) 242} 243 244// duplicated from add collaborator 245func (h *Handle) processCollaborator(ctx context.Context, did string, operation string, record tangled.RepoCollaborator) error { 246 l := log.FromContext(ctx) 247 l = l.With("handler", "processCollaborator", "did", did) 248 249 switch operation { 250 case models.CommitOperationCreate, models.CommitOperationUpdate: 251 repoAt, err := syntax.ParseATURI(record.Repo) 252 if err != nil { 253 return err 254 } 255 256 resolver := h.resolver 257 258 subjectId, err := resolver.ResolveIdent(ctx, record.Subject) 259 if err != nil || subjectId.Handle.IsInvalidHandle() { 260 return err 261 } 262 263 // TODO: fix this for good, we need to fetch the record here unfortunately 264 // resolve this aturi to extract the repo record 265 owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 266 if err != nil || owner.Handle.IsInvalidHandle() { 267 return fmt.Errorf("failed to resolve handle: %w", err) 268 } 269 270 xrpcc := xrpc.Client{ 271 Host: owner.PDSEndpoint(), 272 } 273 274 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 275 if err != nil { 276 return err 277 } 278 279 repo := resp.Value.Val.(*tangled.Repo) 280 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 281 282 // check perms for this user 283 if ok, err := h.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil { 284 return fmt.Errorf("insufficient permissions: %w", err) 285 } 286 287 if err := h.db.AddDid(subjectId.DID.String()); err != nil { 288 return err 289 } 290 h.jc.AddDid(subjectId.DID.String()) 291 292 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil { 293 return err 294 } 295 296 l.Info("added collaborator from firehose", "subject", record.Subject, "repo", record.Repo) 297 298 return h.fetchAndAddKeys(ctx, subjectId.DID.String()) 299 300 case models.CommitOperationDelete: 301 repoAt, err := syntax.ParseATURI(record.Repo) 302 if err != nil { 303 return err 304 } 305 306 resolver := h.resolver 307 308 subjectId, err := resolver.ResolveIdent(ctx, record.Subject) 309 if err != nil || subjectId.Handle.IsInvalidHandle() { 310 return err 311 } 312 313 owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 314 if err != nil || owner.Handle.IsInvalidHandle() { 315 return fmt.Errorf("failed to resolve handle: %w", err) 316 } 317 318 xrpcc := xrpc.Client{ 319 Host: owner.PDSEndpoint(), 320 } 321 322 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 323 if err != nil { 324 return err 325 } 326 327 repo := resp.Value.Val.(*tangled.Repo) 328 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 329 330 if err := h.e.RemoveCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil { 331 l.Error("failed to remove collaborator", "error", err) 332 return fmt.Errorf("failed to remove collaborator: %w", err) 333 } 334 335 l.Info("removed collaborator from firehose", "subject", record.Subject, "repo", record.Repo) 336 } 337 338 return nil 339} 340 341func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error { 342 l := log.FromContext(ctx) 343 344 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 345 if err != nil { 346 l.Error("error building endpoint url", "did", did, "error", err.Error()) 347 return fmt.Errorf("error building endpoint url: %w", err) 348 } 349 350 resp, err := http.Get(keysEndpoint) 351 if err != nil { 352 l.Error("error getting keys", "did", did, "error", err) 353 return fmt.Errorf("error getting keys: %w", err) 354 } 355 defer resp.Body.Close() 356 357 if resp.StatusCode == http.StatusNotFound { 358 l.Info("no keys found for did", "did", did) 359 return nil 360 } 361 362 plaintext, err := io.ReadAll(resp.Body) 363 if err != nil { 364 l.Error("error reading response body", "error", err) 365 return fmt.Errorf("error reading response body: %w", err) 366 } 367 368 for _, key := range strings.Split(string(plaintext), "\n") { 369 if key == "" { 370 continue 371 } 372 pk := db.PublicKey{ 373 Did: did, 374 } 375 pk.Key = key 376 if err := h.db.AddPublicKey(pk); err != nil { 377 l.Error("failed to add public key", "error", err) 378 return fmt.Errorf("failed to add public key: %w", err) 379 } 380 } 381 return nil 382} 383 384func (h *Handle) processMessages(ctx context.Context, event *models.Event) error { 385 did := event.Did 386 if event.Kind != models.EventKindCommit { 387 return nil 388 } 389 390 var err error 391 defer func() { 392 eventTime := event.TimeUS 393 lastTimeUs := eventTime + 1 394 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 395 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 396 } 397 }() 398 399 raw := json.RawMessage(event.Commit.Record) 400 401 switch event.Commit.Collection { 402 case tangled.PublicKeyNSID: 403 var record tangled.PublicKey 404 if err := json.Unmarshal(raw, &record); err != nil { 405 return fmt.Errorf("failed to unmarshal record: %w", err) 406 } 407 if err := h.processPublicKey(ctx, did, event.Commit.Operation, record); err != nil { 408 return fmt.Errorf("failed to process public key: %w", err) 409 } 410 411 case tangled.KnotMemberNSID: 412 var record tangled.KnotMember 413 if err := json.Unmarshal(raw, &record); err != nil { 414 return fmt.Errorf("failed to unmarshal record: %w", err) 415 } 416 if err := h.processKnotMember(ctx, did, event.Commit.Operation, record); err != nil { 417 return fmt.Errorf("failed to process knot member: %w", err) 418 } 419 420 case tangled.RepoPullNSID: 421 var record tangled.RepoPull 422 if err := json.Unmarshal(raw, &record); err != nil { 423 return fmt.Errorf("failed to unmarshal record: %w", err) 424 } 425 if err := h.processPull(ctx, did, record); err != nil { 426 return fmt.Errorf("failed to process knot member: %w", err) 427 } 428 429 case tangled.RepoCollaboratorNSID: 430 var record tangled.RepoCollaborator 431 if err := json.Unmarshal(raw, &record); err != nil { 432 return fmt.Errorf("failed to unmarshal record: %w", err) 433 } 434 if err := h.processCollaborator(ctx, did, event.Commit.Operation, record); err != nil { 435 return fmt.Errorf("failed to process collaborator: %w", err) 436 } 437 } 438 439 return err 440}