forked from tangled.org/core
this repo has no description
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "path/filepath" 11 "slices" 12 "strings" 13 14 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 "github.com/bluesky-social/indigo/xrpc" 17 "github.com/bluesky-social/jetstream/pkg/models" 18 securejoin "github.com/cyphar/filepath-securejoin" 19 "tangled.sh/tangled.sh/core/api/tangled" 20 "tangled.sh/tangled.sh/core/idresolver" 21 "tangled.sh/tangled.sh/core/knotserver/db" 22 "tangled.sh/tangled.sh/core/knotserver/git" 23 "tangled.sh/tangled.sh/core/log" 24 "tangled.sh/tangled.sh/core/rbac" 25 "tangled.sh/tangled.sh/core/workflow" 26) 27 28func (h *Handle) processPublicKey(ctx context.Context, event *models.Event) error { 29 l := log.FromContext(ctx) 30 raw := json.RawMessage(event.Commit.Record) 31 did := event.Did 32 33 var record tangled.PublicKey 34 if err := json.Unmarshal(raw, &record); err != nil { 35 return fmt.Errorf("failed to unmarshal record: %w", err) 36 } 37 38 pk := db.PublicKey{ 39 Did: did, 40 PublicKey: record, 41 } 42 if err := h.db.AddPublicKey(pk); err != nil { 43 l.Error("failed to add public key", "error", err) 44 return fmt.Errorf("failed to add public key: %w", err) 45 } 46 l.Info("added public key from firehose", "did", did) 47 return nil 48} 49 50func (h *Handle) processKnotMember(ctx context.Context, event *models.Event) error { 51 l := log.FromContext(ctx) 52 raw := json.RawMessage(event.Commit.Record) 53 did := event.Did 54 55 var record tangled.KnotMember 56 if err := json.Unmarshal(raw, &record); err != nil { 57 return fmt.Errorf("failed to unmarshal record: %w", err) 58 } 59 60 if record.Domain != h.c.Server.Hostname { 61 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 62 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 63 } 64 65 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 66 if err != nil || !ok { 67 l.Error("failed to add member", "did", did) 68 return fmt.Errorf("failed to enforce permissions: %w", err) 69 } 70 71 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil { 72 l.Error("failed to add member", "error", err) 73 return fmt.Errorf("failed to add member: %w", err) 74 } 75 l.Info("added member from firehose", "member", record.Subject) 76 77 if err := h.db.AddDid(did); err != nil { 78 l.Error("failed to add did", "error", err) 79 return fmt.Errorf("failed to add did: %w", err) 80 } 81 h.jc.AddDid(did) 82 83 if err := h.fetchAndAddKeys(ctx, did); err != nil { 84 return fmt.Errorf("failed to fetch and add keys: %w", err) 85 } 86 87 return nil 88} 89 90func (h *Handle) processPull(ctx context.Context, event *models.Event) error { 91 raw := json.RawMessage(event.Commit.Record) 92 did := event.Did 93 94 var record tangled.RepoPull 95 if err := json.Unmarshal(raw, &record); err != nil { 96 return fmt.Errorf("failed to unmarshal record: %w", err) 97 } 98 99 l := log.FromContext(ctx) 100 l = l.With("handler", "processPull") 101 l = l.With("did", did) 102 l = l.With("target_repo", record.TargetRepo) 103 l = l.With("target_branch", record.TargetBranch) 104 105 if record.Source == nil { 106 reason := "not a branch-based pull request" 107 l.Info("ignoring pull record", "reason", reason) 108 return fmt.Errorf("ignoring pull record: %s", reason) 109 } 110 111 if record.Source.Repo != nil { 112 reason := "fork based pull" 113 l.Info("ignoring pull record", "reason", reason) 114 return fmt.Errorf("ignoring pull record: %s", reason) 115 } 116 117 allDids, err := h.db.GetAllDids() 118 if err != nil { 119 return err 120 } 121 122 // presently: we only process PRs from collaborators for pipelines 123 if !slices.Contains(allDids, did) { 124 reason := "not a known did" 125 l.Info("rejecting pull record", "reason", reason) 126 return fmt.Errorf("rejected pull record: %s, %s", reason, did) 127 } 128 129 repoAt, err := syntax.ParseATURI(record.TargetRepo) 130 if err != nil { 131 return err 132 } 133 134 // resolve this aturi to extract the repo record 135 resolver := idresolver.DefaultResolver() 136 ident, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 137 if err != nil || ident.Handle.IsInvalidHandle() { 138 return fmt.Errorf("failed to resolve handle: %w", err) 139 } 140 141 xrpcc := xrpc.Client{ 142 Host: ident.PDSEndpoint(), 143 } 144 145 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 146 if err != nil { 147 return err 148 } 149 150 repo := resp.Value.Val.(*tangled.Repo) 151 152 if repo.Knot != h.c.Server.Hostname { 153 reason := "not this knot" 154 l.Info("rejecting pull record", "reason", reason) 155 return fmt.Errorf("rejected pull record: %s", reason) 156 } 157 158 didSlashRepo, err := securejoin.SecureJoin(repo.Owner, repo.Name) 159 if err != nil { 160 return err 161 } 162 163 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo) 164 if err != nil { 165 return err 166 } 167 168 gr, err := git.Open(repoPath, record.Source.Branch) 169 if err != nil { 170 return err 171 } 172 173 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 174 if err != nil { 175 return err 176 } 177 178 var pipeline workflow.RawPipeline 179 for _, e := range workflowDir { 180 if !e.IsFile { 181 continue 182 } 183 184 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 185 contents, err := gr.RawContent(fpath) 186 if err != nil { 187 continue 188 } 189 190 pipeline = append(pipeline, workflow.RawWorkflow{ 191 Name: e.Name, 192 Contents: contents, 193 }) 194 } 195 196 trigger := tangled.Pipeline_PullRequestTriggerData{ 197 Action: "create", 198 SourceBranch: record.Source.Branch, 199 SourceSha: record.Source.Sha, 200 TargetBranch: record.TargetBranch, 201 } 202 203 compiler := workflow.Compiler{ 204 Trigger: tangled.Pipeline_TriggerMetadata{ 205 Kind: string(workflow.TriggerKindPullRequest), 206 PullRequest: &trigger, 207 Repo: &tangled.Pipeline_TriggerRepo{ 208 Did: repo.Owner, 209 Knot: repo.Knot, 210 Repo: repo.Name, 211 }, 212 }, 213 } 214 215 cp := compiler.Compile(compiler.Parse(pipeline)) 216 eventJson, err := json.Marshal(cp) 217 if err != nil { 218 return err 219 } 220 221 // do not run empty pipelines 222 if cp.Workflows == nil { 223 return nil 224 } 225 226 ev := db.Event{ 227 Rkey: TID(), 228 Nsid: tangled.PipelineNSID, 229 EventJson: string(eventJson), 230 } 231 232 return h.db.InsertEvent(ev, h.n) 233} 234 235// duplicated from add collaborator 236func (h *Handle) processCollaborator(ctx context.Context, event *models.Event) error { 237 raw := json.RawMessage(event.Commit.Record) 238 did := event.Did 239 240 var record tangled.RepoCollaborator 241 if err := json.Unmarshal(raw, &record); err != nil { 242 return fmt.Errorf("failed to unmarshal record: %w", err) 243 } 244 245 repoAt, err := syntax.ParseATURI(record.Repo) 246 if err != nil { 247 return err 248 } 249 250 resolver := idresolver.DefaultResolver() 251 252 subjectId, err := resolver.ResolveIdent(ctx, record.Subject) 253 if err != nil || subjectId.Handle.IsInvalidHandle() { 254 return err 255 } 256 257 // TODO: fix this for good, we need to fetch the record here unfortunately 258 // resolve this aturi to extract the repo record 259 owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 260 if err != nil || owner.Handle.IsInvalidHandle() { 261 return fmt.Errorf("failed to resolve handle: %w", err) 262 } 263 264 xrpcc := xrpc.Client{ 265 Host: owner.PDSEndpoint(), 266 } 267 268 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 269 if err != nil { 270 return err 271 } 272 273 repo := resp.Value.Val.(*tangled.Repo) 274 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 275 276 // check perms for this user 277 if ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo); !ok || err != nil { 278 return fmt.Errorf("insufficient permissions: %w", err) 279 } 280 281 if err := h.db.AddDid(subjectId.DID.String()); err != nil { 282 return err 283 } 284 h.jc.AddDid(subjectId.DID.String()) 285 286 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil { 287 return err 288 } 289 290 return h.fetchAndAddKeys(ctx, subjectId.DID.String()) 291} 292 293func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error { 294 l := log.FromContext(ctx) 295 296 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 297 if err != nil { 298 l.Error("error building endpoint url", "did", did, "error", err.Error()) 299 return fmt.Errorf("error building endpoint url: %w", err) 300 } 301 302 resp, err := http.Get(keysEndpoint) 303 if err != nil { 304 l.Error("error getting keys", "did", did, "error", err) 305 return fmt.Errorf("error getting keys: %w", err) 306 } 307 defer resp.Body.Close() 308 309 if resp.StatusCode == http.StatusNotFound { 310 l.Info("no keys found for did", "did", did) 311 return nil 312 } 313 314 plaintext, err := io.ReadAll(resp.Body) 315 if err != nil { 316 l.Error("error reading response body", "error", err) 317 return fmt.Errorf("error reading response body: %w", err) 318 } 319 320 for _, key := range strings.Split(string(plaintext), "\n") { 321 if key == "" { 322 continue 323 } 324 pk := db.PublicKey{ 325 Did: did, 326 } 327 pk.Key = key 328 if err := h.db.AddPublicKey(pk); err != nil { 329 l.Error("failed to add public key", "error", err) 330 return fmt.Errorf("failed to add public key: %w", err) 331 } 332 } 333 return nil 334} 335 336func (h *Handle) processMessages(ctx context.Context, event *models.Event) error { 337 if event.Kind != models.EventKindCommit { 338 return nil 339 } 340 341 var err error 342 defer func() { 343 eventTime := event.TimeUS 344 lastTimeUs := eventTime + 1 345 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 346 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 347 } 348 }() 349 350 switch event.Commit.Collection { 351 case tangled.PublicKeyNSID: 352 err = h.processPublicKey(ctx, event) 353 case tangled.KnotMemberNSID: 354 err = h.processKnotMember(ctx, event) 355 case tangled.RepoPullNSID: 356 err = h.processPull(ctx, event) 357 case tangled.RepoCollaboratorNSID: 358 err = h.processCollaborator(ctx, event) 359 } 360 361 if err != nil { 362 h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err) 363 } 364 365 return nil 366}