forked from tangled.org/core
this repo has no description
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "path/filepath" 11 "slices" 12 "strings" 13 14 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 "github.com/bluesky-social/indigo/xrpc" 17 "github.com/bluesky-social/jetstream/pkg/models" 18 securejoin "github.com/cyphar/filepath-securejoin" 19 "tangled.sh/tangled.sh/core/api/tangled" 20 "tangled.sh/tangled.sh/core/idresolver" 21 "tangled.sh/tangled.sh/core/knotserver/db" 22 "tangled.sh/tangled.sh/core/knotserver/git" 23 "tangled.sh/tangled.sh/core/log" 24 "tangled.sh/tangled.sh/core/rbac" 25 "tangled.sh/tangled.sh/core/workflow" 26) 27 28func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error { 29 l := log.FromContext(ctx) 30 pk := db.PublicKey{ 31 Did: did, 32 PublicKey: record, 33 } 34 if err := h.db.AddPublicKey(pk); err != nil { 35 l.Error("failed to add public key", "error", err) 36 return fmt.Errorf("failed to add public key: %w", err) 37 } 38 l.Info("added public key from firehose", "did", did) 39 return nil 40} 41 42func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error { 43 l := log.FromContext(ctx) 44 45 if record.Domain != h.c.Server.Hostname { 46 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 47 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 48 } 49 50 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 51 if err != nil || !ok { 52 l.Error("failed to add member", "did", did) 53 return fmt.Errorf("failed to enforce permissions: %w", err) 54 } 55 56 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil { 57 l.Error("failed to add member", "error", err) 58 return fmt.Errorf("failed to add member: %w", err) 59 } 60 l.Info("added member from firehose", "member", record.Subject) 61 62 if err := h.db.AddDid(did); err != nil { 63 l.Error("failed to add did", "error", err) 64 return fmt.Errorf("failed to add did: %w", err) 65 } 66 h.jc.AddDid(did) 67 68 if err := h.fetchAndAddKeys(ctx, did); err != nil { 69 return fmt.Errorf("failed to fetch and add keys: %w", err) 70 } 71 72 return nil 73} 74 75func (h *Handle) processPull(ctx context.Context, did string, record tangled.RepoPull) error { 76 l := log.FromContext(ctx) 77 l = l.With("handler", "processPull") 78 l = l.With("did", did) 79 l = l.With("target_repo", record.TargetRepo) 80 l = l.With("target_branch", record.TargetBranch) 81 82 if record.Source == nil { 83 reason := "not a branch-based pull request" 84 l.Info("ignoring pull record", "reason", reason) 85 return fmt.Errorf("ignoring pull record: %s", reason) 86 } 87 88 if record.Source.Repo != nil { 89 reason := "fork based pull" 90 l.Info("ignoring pull record", "reason", reason) 91 return fmt.Errorf("ignoring pull record: %s", reason) 92 } 93 94 allDids, err := h.db.GetAllDids() 95 if err != nil { 96 return err 97 } 98 99 // presently: we only process PRs from collaborators for pipelines 100 if !slices.Contains(allDids, did) { 101 reason := "not a known did" 102 l.Info("rejecting pull record", "reason", reason) 103 return fmt.Errorf("rejected pull record: %s, %s", reason, did) 104 } 105 106 repoAt, err := syntax.ParseATURI(record.TargetRepo) 107 if err != nil { 108 return err 109 } 110 111 // resolve this aturi to extract the repo record 112 resolver := idresolver.DefaultResolver() 113 ident, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 114 if err != nil || ident.Handle.IsInvalidHandle() { 115 return fmt.Errorf("failed to resolve handle: %w", err) 116 } 117 118 xrpcc := xrpc.Client{ 119 Host: ident.PDSEndpoint(), 120 } 121 122 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 123 if err != nil { 124 return err 125 } 126 127 repo := resp.Value.Val.(*tangled.Repo) 128 129 if repo.Knot != h.c.Server.Hostname { 130 reason := "not this knot" 131 l.Info("rejecting pull record", "reason", reason) 132 return fmt.Errorf("rejected pull record: %s", reason) 133 } 134 135 didSlashRepo, err := securejoin.SecureJoin(repo.Owner, repo.Name) 136 if err != nil { 137 return err 138 } 139 140 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo) 141 if err != nil { 142 return err 143 } 144 145 gr, err := git.Open(repoPath, record.Source.Branch) 146 if err != nil { 147 return err 148 } 149 150 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 151 if err != nil { 152 return err 153 } 154 155 var pipeline workflow.Pipeline 156 for _, e := range workflowDir { 157 if !e.IsFile { 158 continue 159 } 160 161 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 162 contents, err := gr.RawContent(fpath) 163 if err != nil { 164 continue 165 } 166 167 wf, err := workflow.FromFile(e.Name, contents) 168 if err != nil { 169 // TODO: log here, respond to client that is pushing 170 h.l.Error("failed to parse workflow", "err", err, "path", fpath) 171 continue 172 } 173 174 pipeline = append(pipeline, wf) 175 } 176 177 trigger := tangled.Pipeline_PullRequestTriggerData{ 178 Action: "create", 179 SourceBranch: record.Source.Branch, 180 SourceSha: record.Source.Sha, 181 TargetBranch: record.TargetBranch, 182 } 183 184 compiler := workflow.Compiler{ 185 Trigger: tangled.Pipeline_TriggerMetadata{ 186 Kind: string(workflow.TriggerKindPullRequest), 187 PullRequest: &trigger, 188 Repo: &tangled.Pipeline_TriggerRepo{ 189 Did: repo.Owner, 190 Knot: repo.Knot, 191 Repo: repo.Name, 192 }, 193 }, 194 } 195 196 cp := compiler.Compile(pipeline) 197 eventJson, err := json.Marshal(cp) 198 if err != nil { 199 return err 200 } 201 202 // do not run empty pipelines 203 if cp.Workflows == nil { 204 return nil 205 } 206 207 event := db.Event{ 208 Rkey: TID(), 209 Nsid: tangled.PipelineNSID, 210 EventJson: string(eventJson), 211 } 212 213 return h.db.InsertEvent(event, h.n) 214} 215 216// duplicated from add collaborator 217func (h *Handle) processCollaborator(ctx context.Context, did string, record tangled.RepoCollaborator) error { 218 repoAt, err := syntax.ParseATURI(record.Repo) 219 if err != nil { 220 return err 221 } 222 223 resolver := idresolver.DefaultResolver() 224 225 subjectId, err := resolver.ResolveIdent(ctx, record.Subject) 226 if err != nil || subjectId.Handle.IsInvalidHandle() { 227 return err 228 } 229 230 // TODO: fix this for good, we need to fetch the record here unfortunately 231 // resolve this aturi to extract the repo record 232 owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 233 if err != nil || owner.Handle.IsInvalidHandle() { 234 return fmt.Errorf("failed to resolve handle: %w", err) 235 } 236 237 xrpcc := xrpc.Client{ 238 Host: owner.PDSEndpoint(), 239 } 240 241 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 242 if err != nil { 243 return err 244 } 245 246 repo := resp.Value.Val.(*tangled.Repo) 247 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 248 249 // check perms for this user 250 if ok, err := h.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil { 251 return fmt.Errorf("insufficient permissions: %w", err) 252 } 253 254 if err := h.db.AddDid(subjectId.DID.String()); err != nil { 255 return err 256 } 257 h.jc.AddDid(subjectId.DID.String()) 258 259 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil { 260 return err 261 } 262 263 return h.fetchAndAddKeys(ctx, subjectId.DID.String()) 264} 265 266func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error { 267 l := log.FromContext(ctx) 268 269 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 270 if err != nil { 271 l.Error("error building endpoint url", "did", did, "error", err.Error()) 272 return fmt.Errorf("error building endpoint url: %w", err) 273 } 274 275 resp, err := http.Get(keysEndpoint) 276 if err != nil { 277 l.Error("error getting keys", "did", did, "error", err) 278 return fmt.Errorf("error getting keys: %w", err) 279 } 280 defer resp.Body.Close() 281 282 if resp.StatusCode == http.StatusNotFound { 283 l.Info("no keys found for did", "did", did) 284 return nil 285 } 286 287 plaintext, err := io.ReadAll(resp.Body) 288 if err != nil { 289 l.Error("error reading response body", "error", err) 290 return fmt.Errorf("error reading response body: %w", err) 291 } 292 293 for _, key := range strings.Split(string(plaintext), "\n") { 294 if key == "" { 295 continue 296 } 297 pk := db.PublicKey{ 298 Did: did, 299 } 300 pk.Key = key 301 if err := h.db.AddPublicKey(pk); err != nil { 302 l.Error("failed to add public key", "error", err) 303 return fmt.Errorf("failed to add public key: %w", err) 304 } 305 } 306 return nil 307} 308 309func (h *Handle) processMessages(ctx context.Context, event *models.Event) error { 310 did := event.Did 311 if event.Kind != models.EventKindCommit { 312 return nil 313 } 314 315 var err error 316 defer func() { 317 eventTime := event.TimeUS 318 lastTimeUs := eventTime + 1 319 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 320 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 321 } 322 }() 323 324 raw := json.RawMessage(event.Commit.Record) 325 326 switch event.Commit.Collection { 327 case tangled.PublicKeyNSID: 328 var record tangled.PublicKey 329 if err := json.Unmarshal(raw, &record); err != nil { 330 return fmt.Errorf("failed to unmarshal record: %w", err) 331 } 332 if err := h.processPublicKey(ctx, did, record); err != nil { 333 return fmt.Errorf("failed to process public key: %w", err) 334 } 335 336 case tangled.KnotMemberNSID: 337 var record tangled.KnotMember 338 if err := json.Unmarshal(raw, &record); err != nil { 339 return fmt.Errorf("failed to unmarshal record: %w", err) 340 } 341 if err := h.processKnotMember(ctx, did, record); err != nil { 342 return fmt.Errorf("failed to process knot member: %w", err) 343 } 344 345 case tangled.RepoPullNSID: 346 var record tangled.RepoPull 347 if err := json.Unmarshal(raw, &record); err != nil { 348 return fmt.Errorf("failed to unmarshal record: %w", err) 349 } 350 if err := h.processPull(ctx, did, record); err != nil { 351 return fmt.Errorf("failed to process knot member: %w", err) 352 } 353 354 case tangled.RepoCollaboratorNSID: 355 var record tangled.RepoCollaborator 356 if err := json.Unmarshal(raw, &record); err != nil { 357 return fmt.Errorf("failed to unmarshal record: %w", err) 358 } 359 if err := h.processCollaborator(ctx, did, record); err != nil { 360 return fmt.Errorf("failed to process knot member: %w", err) 361 } 362 363 } 364 365 return err 366}