forked from tangled.org/core
this repo has no description
at master 9.8 kB view raw
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "path/filepath" 11 "strings" 12 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/bluesky-social/indigo/xrpc" 16 "github.com/bluesky-social/jetstream/pkg/models" 17 securejoin "github.com/cyphar/filepath-securejoin" 18 "tangled.sh/tangled.sh/core/api/tangled" 19 "tangled.sh/tangled.sh/core/idresolver" 20 "tangled.sh/tangled.sh/core/knotserver/db" 21 "tangled.sh/tangled.sh/core/knotserver/git" 22 "tangled.sh/tangled.sh/core/log" 23 "tangled.sh/tangled.sh/core/rbac" 24 "tangled.sh/tangled.sh/core/workflow" 25) 26 27func (h *Knot) processPublicKey(ctx context.Context, event *models.Event) error { 28 l := log.FromContext(ctx) 29 raw := json.RawMessage(event.Commit.Record) 30 did := event.Did 31 32 var record tangled.PublicKey 33 if err := json.Unmarshal(raw, &record); err != nil { 34 return fmt.Errorf("failed to unmarshal record: %w", err) 35 } 36 37 pk := db.PublicKey{ 38 Did: did, 39 PublicKey: record, 40 } 41 if err := h.db.AddPublicKey(pk); err != nil { 42 l.Error("failed to add public key", "error", err) 43 return fmt.Errorf("failed to add public key: %w", err) 44 } 45 l.Info("added public key from firehose", "did", did) 46 return nil 47} 48 49func (h *Knot) processKnotMember(ctx context.Context, event *models.Event) error { 50 l := log.FromContext(ctx) 51 raw := json.RawMessage(event.Commit.Record) 52 did := event.Did 53 54 var record tangled.KnotMember 55 if err := json.Unmarshal(raw, &record); err != nil { 56 return fmt.Errorf("failed to unmarshal record: %w", err) 57 } 58 59 if record.Domain != h.c.Server.Hostname { 60 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 61 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 62 } 63 64 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 65 if err != nil || !ok { 66 l.Error("failed to add member", "did", did) 67 return fmt.Errorf("failed to enforce permissions: %w", err) 68 } 69 70 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil { 71 l.Error("failed to add member", "error", err) 72 return fmt.Errorf("failed to add member: %w", err) 73 } 74 l.Info("added member from firehose", "member", record.Subject) 75 76 if err := h.db.AddDid(record.Subject); err != nil { 77 l.Error("failed to add did", "error", err) 78 return fmt.Errorf("failed to add did: %w", err) 79 } 80 h.jc.AddDid(record.Subject) 81 82 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil { 83 return fmt.Errorf("failed to fetch and add keys: %w", err) 84 } 85 86 return nil 87} 88 89func (h *Knot) processPull(ctx context.Context, event *models.Event) error { 90 raw := json.RawMessage(event.Commit.Record) 91 did := event.Did 92 93 var record tangled.RepoPull 94 if err := json.Unmarshal(raw, &record); err != nil { 95 return fmt.Errorf("failed to unmarshal record: %w", err) 96 } 97 98 l := log.FromContext(ctx) 99 l = l.With("handler", "processPull") 100 l = l.With("did", did) 101 102 if record.Target == nil { 103 return fmt.Errorf("ignoring pull record: target repo is nil") 104 } 105 106 l = l.With("target_repo", record.Target.Repo) 107 l = l.With("target_branch", record.Target.Branch) 108 109 if record.Source == nil { 110 return fmt.Errorf("ignoring pull record: not a branch-based pull request") 111 } 112 113 if record.Source.Repo != nil { 114 return fmt.Errorf("ignoring pull record: fork based pull") 115 } 116 117 repoAt, err := syntax.ParseATURI(record.Target.Repo) 118 if err != nil { 119 return fmt.Errorf("failed to parse ATURI: %w", err) 120 } 121 122 // resolve this aturi to extract the repo record 123 resolver := idresolver.DefaultResolver() 124 ident, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 125 if err != nil || ident.Handle.IsInvalidHandle() { 126 return fmt.Errorf("failed to resolve handle: %w", err) 127 } 128 129 xrpcc := xrpc.Client{ 130 Host: ident.PDSEndpoint(), 131 } 132 133 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 134 if err != nil { 135 return fmt.Errorf("failed to resolver repo: %w", err) 136 } 137 138 repo := resp.Value.Val.(*tangled.Repo) 139 140 if repo.Knot != h.c.Server.Hostname { 141 return fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname) 142 } 143 144 didSlashRepo, err := securejoin.SecureJoin(repo.Owner, repo.Name) 145 if err != nil { 146 return fmt.Errorf("failed to construct relative repo path: %w", err) 147 } 148 149 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo) 150 if err != nil { 151 return fmt.Errorf("failed to construct absolute repo path: %w", err) 152 } 153 154 gr, err := git.Open(repoPath, record.Source.Branch) 155 if err != nil { 156 return fmt.Errorf("failed to open git repository: %w", err) 157 } 158 159 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 160 if err != nil { 161 return fmt.Errorf("failed to open workflow directory: %w", err) 162 } 163 164 var pipeline workflow.RawPipeline 165 for _, e := range workflowDir { 166 if !e.IsFile { 167 continue 168 } 169 170 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 171 contents, err := gr.RawContent(fpath) 172 if err != nil { 173 continue 174 } 175 176 pipeline = append(pipeline, workflow.RawWorkflow{ 177 Name: e.Name, 178 Contents: contents, 179 }) 180 } 181 182 trigger := tangled.Pipeline_PullRequestTriggerData{ 183 Action: "create", 184 SourceBranch: record.Source.Branch, 185 SourceSha: record.Source.Sha, 186 TargetBranch: record.Target.Branch, 187 } 188 189 compiler := workflow.Compiler{ 190 Trigger: tangled.Pipeline_TriggerMetadata{ 191 Kind: string(workflow.TriggerKindPullRequest), 192 PullRequest: &trigger, 193 Repo: &tangled.Pipeline_TriggerRepo{ 194 Did: repo.Owner, 195 Knot: repo.Knot, 196 Repo: repo.Name, 197 }, 198 }, 199 } 200 201 cp := compiler.Compile(compiler.Parse(pipeline)) 202 eventJson, err := json.Marshal(cp) 203 if err != nil { 204 return fmt.Errorf("failed to marshal pipeline event: %w", err) 205 } 206 207 // do not run empty pipelines 208 if cp.Workflows == nil { 209 return nil 210 } 211 212 ev := db.Event{ 213 Rkey: TID(), 214 Nsid: tangled.PipelineNSID, 215 EventJson: string(eventJson), 216 } 217 218 return h.db.InsertEvent(ev, h.n) 219} 220 221// duplicated from add collaborator 222func (h *Knot) processCollaborator(ctx context.Context, event *models.Event) error { 223 raw := json.RawMessage(event.Commit.Record) 224 did := event.Did 225 226 var record tangled.RepoCollaborator 227 if err := json.Unmarshal(raw, &record); err != nil { 228 return fmt.Errorf("failed to unmarshal record: %w", err) 229 } 230 231 repoAt, err := syntax.ParseATURI(record.Repo) 232 if err != nil { 233 return err 234 } 235 236 resolver := idresolver.DefaultResolver() 237 238 subjectId, err := resolver.ResolveIdent(ctx, record.Subject) 239 if err != nil || subjectId.Handle.IsInvalidHandle() { 240 return err 241 } 242 243 // TODO: fix this for good, we need to fetch the record here unfortunately 244 // resolve this aturi to extract the repo record 245 owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 246 if err != nil || owner.Handle.IsInvalidHandle() { 247 return fmt.Errorf("failed to resolve handle: %w", err) 248 } 249 250 xrpcc := xrpc.Client{ 251 Host: owner.PDSEndpoint(), 252 } 253 254 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 255 if err != nil { 256 return err 257 } 258 259 repo := resp.Value.Val.(*tangled.Repo) 260 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 261 262 // check perms for this user 263 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo) 264 if err != nil { 265 return fmt.Errorf("failed to check permissions: %w", err) 266 } 267 if !ok { 268 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", didSlashRepo) 269 } 270 271 if err := h.db.AddDid(subjectId.DID.String()); err != nil { 272 return err 273 } 274 h.jc.AddDid(subjectId.DID.String()) 275 276 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil { 277 return err 278 } 279 280 return h.fetchAndAddKeys(ctx, subjectId.DID.String()) 281} 282 283func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error { 284 l := log.FromContext(ctx) 285 286 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 287 if err != nil { 288 l.Error("error building endpoint url", "did", did, "error", err.Error()) 289 return fmt.Errorf("error building endpoint url: %w", err) 290 } 291 292 resp, err := http.Get(keysEndpoint) 293 if err != nil { 294 l.Error("error getting keys", "did", did, "error", err) 295 return fmt.Errorf("error getting keys: %w", err) 296 } 297 defer resp.Body.Close() 298 299 if resp.StatusCode == http.StatusNotFound { 300 l.Info("no keys found for did", "did", did) 301 return nil 302 } 303 304 plaintext, err := io.ReadAll(resp.Body) 305 if err != nil { 306 l.Error("error reading response body", "error", err) 307 return fmt.Errorf("error reading response body: %w", err) 308 } 309 310 for key := range strings.SplitSeq(string(plaintext), "\n") { 311 if key == "" { 312 continue 313 } 314 pk := db.PublicKey{ 315 Did: did, 316 } 317 pk.Key = key 318 if err := h.db.AddPublicKey(pk); err != nil { 319 l.Error("failed to add public key", "error", err) 320 return fmt.Errorf("failed to add public key: %w", err) 321 } 322 } 323 return nil 324} 325 326func (h *Knot) processMessages(ctx context.Context, event *models.Event) error { 327 if event.Kind != models.EventKindCommit { 328 return nil 329 } 330 331 var err error 332 defer func() { 333 eventTime := event.TimeUS 334 lastTimeUs := eventTime + 1 335 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 336 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 337 } 338 }() 339 340 switch event.Commit.Collection { 341 case tangled.PublicKeyNSID: 342 err = h.processPublicKey(ctx, event) 343 case tangled.KnotMemberNSID: 344 err = h.processKnotMember(ctx, event) 345 case tangled.RepoPullNSID: 346 err = h.processPull(ctx, event) 347 case tangled.RepoCollaboratorNSID: 348 err = h.processCollaborator(ctx, event) 349 } 350 351 if err != nil { 352 h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err) 353 } 354 355 return nil 356}