forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "path/filepath" 11 "slices" 12 "strings" 13 14 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 "github.com/bluesky-social/indigo/xrpc" 17 "github.com/bluesky-social/jetstream/pkg/models" 18 securejoin "github.com/cyphar/filepath-securejoin" 19 "tangled.sh/tangled.sh/core/api/tangled" 20 "tangled.sh/tangled.sh/core/idresolver" 21 "tangled.sh/tangled.sh/core/knotserver/db" 22 "tangled.sh/tangled.sh/core/knotserver/git" 23 "tangled.sh/tangled.sh/core/log" 24 "tangled.sh/tangled.sh/core/rbac" 25 "tangled.sh/tangled.sh/core/workflow" 26) 27 28func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error { 29 l := log.FromContext(ctx) 30 31 allDids, err := h.db.GetAllDids() 32 if err != nil { 33 return err 34 } 35 36 // only process public keys from known DIDs 37 if !slices.Contains(allDids, did) { 38 reason := "not a known did" 39 l.Debug("rejecting public key record", "reason", reason, "did", did) 40 return nil 41 } 42 43 pk := db.PublicKey{ 44 Did: did, 45 PublicKey: record, 46 } 47 if err := h.db.AddPublicKey(pk); err != nil { 48 l.Error("failed to add public key", "error", err) 49 return fmt.Errorf("failed to add public key: %w", err) 50 } 51 l.Info("added public key from firehose", "did", did) 52 return nil 53} 54 55func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error { 56 l := log.FromContext(ctx) 57 58 if record.Domain != h.c.Server.Hostname { 59 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 60 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 61 } 62 63 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 64 if err != nil || !ok { 65 l.Error("failed to add member", "did", did) 66 return fmt.Errorf("failed to enforce permissions: %w", err) 67 } 68 69 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil { 70 l.Error("failed to add member", "error", err) 71 return fmt.Errorf("failed to add member: %w", err) 72 } 73 l.Info("added member from firehose", "member", record.Subject) 74 75 if err := h.db.AddDid(did); err != nil { 76 l.Error("failed to add did", "error", err) 77 return fmt.Errorf("failed to add did: %w", err) 78 } 79 h.jc.AddDid(did) 80 81 if err := h.fetchAndAddKeys(ctx, did); err != nil { 82 return fmt.Errorf("failed to fetch and add keys: %w", err) 83 } 84 85 return nil 86} 87 88func (h *Handle) processPull(ctx context.Context, did string, record tangled.RepoPull) error { 89 l := log.FromContext(ctx) 90 l = l.With("handler", "processPull") 91 l = l.With("did", did) 92 l = l.With("target_repo", record.TargetRepo) 93 l = l.With("target_branch", record.TargetBranch) 94 95 if record.Source == nil { 96 reason := "not a branch-based pull request" 97 l.Info("ignoring pull record", "reason", reason) 98 return fmt.Errorf("ignoring pull record: %s", reason) 99 } 100 101 if record.Source.Repo != nil { 102 reason := "fork based pull" 103 l.Info("ignoring pull record", "reason", reason) 104 return fmt.Errorf("ignoring pull record: %s", reason) 105 } 106 107 allDids, err := h.db.GetAllDids() 108 if err != nil { 109 return err 110 } 111 112 // presently: we only process PRs from collaborators for pipelines 113 if !slices.Contains(allDids, did) { 114 reason := "not a known did" 115 l.Debug("rejecting pull record", "reason", reason) 116 return nil 117 } 118 119 repoAt, err := syntax.ParseATURI(record.TargetRepo) 120 if err != nil { 121 return err 122 } 123 124 // resolve this aturi to extract the repo record 125 resolver := idresolver.DefaultResolver() 126 ident, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 127 if err != nil || ident.Handle.IsInvalidHandle() { 128 return fmt.Errorf("failed to resolve handle: %w", err) 129 } 130 131 xrpcc := xrpc.Client{ 132 Host: ident.PDSEndpoint(), 133 } 134 135 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 136 if err != nil { 137 return err 138 } 139 140 repo := resp.Value.Val.(*tangled.Repo) 141 142 if repo.Knot != h.c.Server.Hostname { 143 reason := "not this knot" 144 l.Debug("rejecting pull record", "reason", reason) 145 return nil 146 } 147 148 didSlashRepo, err := securejoin.SecureJoin(repo.Owner, repo.Name) 149 if err != nil { 150 return err 151 } 152 153 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo) 154 if err != nil { 155 return err 156 } 157 158 gr, err := git.Open(repoPath, record.Source.Branch) 159 if err != nil { 160 return err 161 } 162 163 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 164 if err != nil { 165 return err 166 } 167 168 var pipeline workflow.Pipeline 169 for _, e := range workflowDir { 170 if !e.IsFile { 171 continue 172 } 173 174 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 175 contents, err := gr.RawContent(fpath) 176 if err != nil { 177 continue 178 } 179 180 wf, err := workflow.FromFile(e.Name, contents) 181 if err != nil { 182 // TODO: log here, respond to client that is pushing 183 h.l.Error("failed to parse workflow", "err", err, "path", fpath) 184 continue 185 } 186 187 pipeline = append(pipeline, wf) 188 } 189 190 trigger := tangled.Pipeline_PullRequestTriggerData{ 191 Action: "create", 192 SourceBranch: record.Source.Branch, 193 SourceSha: record.Source.Sha, 194 TargetBranch: record.TargetBranch, 195 } 196 197 compiler := workflow.Compiler{ 198 Trigger: tangled.Pipeline_TriggerMetadata{ 199 Kind: string(workflow.TriggerKindPullRequest), 200 PullRequest: &trigger, 201 Repo: &tangled.Pipeline_TriggerRepo{ 202 Did: repo.Owner, 203 Knot: repo.Knot, 204 Repo: repo.Name, 205 }, 206 }, 207 } 208 209 cp := compiler.Compile(pipeline) 210 eventJson, err := json.Marshal(cp) 211 if err != nil { 212 return err 213 } 214 215 // do not run empty pipelines 216 if cp.Workflows == nil { 217 return nil 218 } 219 220 event := db.Event{ 221 Rkey: TID(), 222 Nsid: tangled.PipelineNSID, 223 EventJson: string(eventJson), 224 } 225 226 return h.db.InsertEvent(event, h.n) 227} 228 229// duplicated from add collaborator 230func (h *Handle) processCollaborator(ctx context.Context, did string, record tangled.RepoCollaborator) error { 231 repoAt, err := syntax.ParseATURI(record.Repo) 232 if err != nil { 233 return err 234 } 235 236 resolver := idresolver.DefaultResolver() 237 238 subjectId, err := resolver.ResolveIdent(ctx, record.Subject) 239 if err != nil || subjectId.Handle.IsInvalidHandle() { 240 return err 241 } 242 243 // TODO: fix this for good, we need to fetch the record here unfortunately 244 // resolve this aturi to extract the repo record 245 owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 246 if err != nil || owner.Handle.IsInvalidHandle() { 247 return fmt.Errorf("failed to resolve handle: %w", err) 248 } 249 250 xrpcc := xrpc.Client{ 251 Host: owner.PDSEndpoint(), 252 } 253 254 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 255 if err != nil { 256 return err 257 } 258 259 repo := resp.Value.Val.(*tangled.Repo) 260 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 261 262 // check perms for this user 263 if ok, err := h.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil { 264 return fmt.Errorf("insufficient permissions: %w", err) 265 } 266 267 if err := h.db.AddDid(subjectId.DID.String()); err != nil { 268 return err 269 } 270 h.jc.AddDid(subjectId.DID.String()) 271 272 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil { 273 return err 274 } 275 276 return h.fetchAndAddKeys(ctx, subjectId.DID.String()) 277} 278 279func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error { 280 l := log.FromContext(ctx) 281 282 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 283 if err != nil { 284 l.Error("error building endpoint url", "did", did, "error", err.Error()) 285 return fmt.Errorf("error building endpoint url: %w", err) 286 } 287 288 resp, err := http.Get(keysEndpoint) 289 if err != nil { 290 l.Error("error getting keys", "did", did, "error", err) 291 return fmt.Errorf("error getting keys: %w", err) 292 } 293 defer resp.Body.Close() 294 295 if resp.StatusCode == http.StatusNotFound { 296 l.Info("no keys found for did", "did", did) 297 return nil 298 } 299 300 plaintext, err := io.ReadAll(resp.Body) 301 if err != nil { 302 l.Error("error reading response body", "error", err) 303 return fmt.Errorf("error reading response body: %w", err) 304 } 305 306 for _, key := range strings.Split(string(plaintext), "\n") { 307 if key == "" { 308 continue 309 } 310 pk := db.PublicKey{ 311 Did: did, 312 } 313 pk.Key = key 314 if err := h.db.AddPublicKey(pk); err != nil { 315 l.Error("failed to add public key", "error", err) 316 return fmt.Errorf("failed to add public key: %w", err) 317 } 318 } 319 return nil 320} 321 322func (h *Handle) processMessages(ctx context.Context, event *models.Event) error { 323 did := event.Did 324 if event.Kind != models.EventKindCommit { 325 return nil 326 } 327 328 var err error 329 defer func() { 330 eventTime := event.TimeUS 331 lastTimeUs := eventTime + 1 332 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 333 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 334 } 335 }() 336 337 raw := json.RawMessage(event.Commit.Record) 338 339 switch event.Commit.Collection { 340 case tangled.PublicKeyNSID: 341 var record tangled.PublicKey 342 if err := json.Unmarshal(raw, &record); err != nil { 343 return fmt.Errorf("failed to unmarshal record: %w", err) 344 } 345 if err := h.processPublicKey(ctx, did, record); err != nil { 346 return fmt.Errorf("failed to process public key: %w", err) 347 } 348 349 case tangled.KnotMemberNSID: 350 var record tangled.KnotMember 351 if err := json.Unmarshal(raw, &record); err != nil { 352 return fmt.Errorf("failed to unmarshal record: %w", err) 353 } 354 if err := h.processKnotMember(ctx, did, record); err != nil { 355 return fmt.Errorf("failed to process knot member: %w", err) 356 } 357 358 case tangled.RepoPullNSID: 359 var record tangled.RepoPull 360 if err := json.Unmarshal(raw, &record); err != nil { 361 return fmt.Errorf("failed to unmarshal record: %w", err) 362 } 363 if err := h.processPull(ctx, did, record); err != nil { 364 return fmt.Errorf("failed to process knot member: %w", err) 365 } 366 367 case tangled.RepoCollaboratorNSID: 368 var record tangled.RepoCollaborator 369 if err := json.Unmarshal(raw, &record); err != nil { 370 return fmt.Errorf("failed to unmarshal record: %w", err) 371 } 372 if err := h.processCollaborator(ctx, did, record); err != nil { 373 return fmt.Errorf("failed to process knot member: %w", err) 374 } 375 376 } 377 378 return err 379}