forked from tangled.org/core
this repo has no description
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "path/filepath" 11 "slices" 12 "strings" 13 14 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 "github.com/bluesky-social/indigo/xrpc" 17 "github.com/bluesky-social/jetstream/pkg/models" 18 securejoin "github.com/cyphar/filepath-securejoin" 19 "tangled.sh/tangled.sh/core/api/tangled" 20 "tangled.sh/tangled.sh/core/appview/idresolver" 21 "tangled.sh/tangled.sh/core/knotserver/db" 22 "tangled.sh/tangled.sh/core/knotserver/git" 23 "tangled.sh/tangled.sh/core/log" 24 "tangled.sh/tangled.sh/core/workflow" 25) 26 27func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error { 28 l := log.FromContext(ctx) 29 pk := db.PublicKey{ 30 Did: did, 31 PublicKey: record, 32 } 33 if err := h.db.AddPublicKey(pk); err != nil { 34 l.Error("failed to add public key", "error", err) 35 return fmt.Errorf("failed to add public key: %w", err) 36 } 37 l.Info("added public key from firehose", "did", did) 38 return nil 39} 40 41func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error { 42 l := log.FromContext(ctx) 43 44 if record.Domain != h.c.Server.Hostname { 45 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 46 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 47 } 48 49 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite") 50 if err != nil || !ok { 51 l.Error("failed to add member", "did", did) 52 return fmt.Errorf("failed to enforce permissions: %w", err) 53 } 54 55 if err := h.e.AddKnotMember(ThisServer, record.Subject); err != nil { 56 l.Error("failed to add member", "error", err) 57 return fmt.Errorf("failed to add member: %w", err) 58 } 59 l.Info("added member from firehose", "member", record.Subject) 60 61 if err := h.db.AddDid(did); err != nil { 62 l.Error("failed to add did", "error", err) 63 return fmt.Errorf("failed to add did: %w", err) 64 } 65 h.jc.AddDid(did) 66 67 if err := h.fetchAndAddKeys(ctx, did); err != nil { 68 return fmt.Errorf("failed to fetch and add keys: %w", err) 69 } 70 71 return nil 72} 73 74func (h *Handle) processPull(ctx context.Context, did string, record tangled.RepoPull) error { 75 l := log.FromContext(ctx) 76 l = l.With("handler", "processPull") 77 l = l.With("did", did) 78 l = l.With("target_repo", record.TargetRepo) 79 l = l.With("target_branch", record.TargetBranch) 80 81 if record.Source == nil { 82 reason := "not a branch-based pull request" 83 l.Info("ignoring pull record", "reason", reason) 84 return fmt.Errorf("ignoring pull record: %s", reason) 85 } 86 87 if record.Source.Repo != nil { 88 reason := "fork based pull" 89 l.Info("ignoring pull record", "reason", reason) 90 return fmt.Errorf("ignoring pull record: %s", reason) 91 } 92 93 allDids, err := h.db.GetAllDids() 94 if err != nil { 95 return err 96 } 97 98 // presently: we only process PRs from collaborators for pipelines 99 if !slices.Contains(allDids, did) { 100 reason := "not a known did" 101 l.Info("rejecting pull record", "reason", reason) 102 return fmt.Errorf("rejected pull record: %s, %s", reason, did) 103 } 104 105 repoAt, err := syntax.ParseATURI(record.TargetRepo) 106 if err != nil { 107 return err 108 } 109 110 // resolve this aturi to extract the repo record 111 resolver := idresolver.DefaultResolver() 112 ident, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 113 if err != nil || ident.Handle.IsInvalidHandle() { 114 return fmt.Errorf("failed to resolve handle: %w", err) 115 } 116 117 xrpcc := xrpc.Client{ 118 Host: ident.PDSEndpoint(), 119 } 120 121 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 122 if err != nil { 123 return err 124 } 125 126 repo := resp.Value.Val.(*tangled.Repo) 127 128 if repo.Knot != h.c.Server.Hostname { 129 reason := "not this knot" 130 l.Info("rejecting pull record", "reason", reason) 131 return fmt.Errorf("rejected pull record: %s", reason) 132 } 133 134 didSlashRepo, err := securejoin.SecureJoin(repo.Owner, repo.Name) 135 if err != nil { 136 return err 137 } 138 139 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo) 140 if err != nil { 141 return err 142 } 143 144 gr, err := git.Open(repoPath, record.Source.Branch) 145 if err != nil { 146 return err 147 } 148 149 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 150 if err != nil { 151 return err 152 } 153 154 var pipeline workflow.Pipeline 155 for _, e := range workflowDir { 156 if !e.IsFile { 157 continue 158 } 159 160 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 161 contents, err := gr.RawContent(fpath) 162 if err != nil { 163 continue 164 } 165 166 wf, err := workflow.FromFile(e.Name, contents) 167 if err != nil { 168 // TODO: log here, respond to client that is pushing 169 h.l.Error("failed to parse workflow", "err", err, "path", fpath) 170 continue 171 } 172 173 pipeline = append(pipeline, wf) 174 } 175 176 trigger := tangled.Pipeline_PullRequestTriggerData{ 177 Action: "create", 178 SourceBranch: record.Source.Branch, 179 SourceSha: record.Source.Sha, 180 TargetBranch: record.TargetBranch, 181 } 182 183 compiler := workflow.Compiler{ 184 Trigger: tangled.Pipeline_TriggerMetadata{ 185 Kind: string(workflow.TriggerKindPullRequest), 186 PullRequest: &trigger, 187 Repo: &tangled.Pipeline_TriggerRepo{ 188 Did: repo.Owner, 189 Knot: repo.Knot, 190 Repo: repo.Name, 191 }, 192 }, 193 } 194 195 cp := compiler.Compile(pipeline) 196 eventJson, err := json.Marshal(cp) 197 if err != nil { 198 return err 199 } 200 201 // do not run empty pipelines 202 if cp.Workflows == nil { 203 return nil 204 } 205 206 event := db.Event{ 207 Rkey: TID(), 208 Nsid: tangled.PipelineNSID, 209 EventJson: string(eventJson), 210 } 211 212 return h.db.InsertEvent(event, h.n) 213} 214 215func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error { 216 l := log.FromContext(ctx) 217 218 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 219 if err != nil { 220 l.Error("error building endpoint url", "did", did, "error", err.Error()) 221 return fmt.Errorf("error building endpoint url: %w", err) 222 } 223 224 resp, err := http.Get(keysEndpoint) 225 if err != nil { 226 l.Error("error getting keys", "did", did, "error", err) 227 return fmt.Errorf("error getting keys: %w", err) 228 } 229 defer resp.Body.Close() 230 231 if resp.StatusCode == http.StatusNotFound { 232 l.Info("no keys found for did", "did", did) 233 return nil 234 } 235 236 plaintext, err := io.ReadAll(resp.Body) 237 if err != nil { 238 l.Error("error reading response body", "error", err) 239 return fmt.Errorf("error reading response body: %w", err) 240 } 241 242 for _, key := range strings.Split(string(plaintext), "\n") { 243 if key == "" { 244 continue 245 } 246 pk := db.PublicKey{ 247 Did: did, 248 } 249 pk.Key = key 250 if err := h.db.AddPublicKey(pk); err != nil { 251 l.Error("failed to add public key", "error", err) 252 return fmt.Errorf("failed to add public key: %w", err) 253 } 254 } 255 return nil 256} 257 258func (h *Handle) processMessages(ctx context.Context, event *models.Event) error { 259 did := event.Did 260 if event.Kind != models.EventKindCommit { 261 return nil 262 } 263 264 var err error 265 defer func() { 266 eventTime := event.TimeUS 267 lastTimeUs := eventTime + 1 268 fmt.Println("lastTimeUs", lastTimeUs) 269 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 270 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 271 } 272 }() 273 274 raw := json.RawMessage(event.Commit.Record) 275 276 switch event.Commit.Collection { 277 case tangled.PublicKeyNSID: 278 var record tangled.PublicKey 279 if err := json.Unmarshal(raw, &record); err != nil { 280 return fmt.Errorf("failed to unmarshal record: %w", err) 281 } 282 if err := h.processPublicKey(ctx, did, record); err != nil { 283 return fmt.Errorf("failed to process public key: %w", err) 284 } 285 286 case tangled.KnotMemberNSID: 287 var record tangled.KnotMember 288 if err := json.Unmarshal(raw, &record); err != nil { 289 return fmt.Errorf("failed to unmarshal record: %w", err) 290 } 291 if err := h.processKnotMember(ctx, did, record); err != nil { 292 return fmt.Errorf("failed to process knot member: %w", err) 293 } 294 case tangled.RepoPullNSID: 295 var record tangled.RepoPull 296 if err := json.Unmarshal(raw, &record); err != nil { 297 return fmt.Errorf("failed to unmarshal record: %w", err) 298 } 299 if err := h.processPull(ctx, did, record); err != nil { 300 return fmt.Errorf("failed to process knot member: %w", err) 301 } 302 } 303 304 return err 305}