···
14
+
comatproto "github.com/bluesky-social/indigo/api/atproto"
15
+
"github.com/bluesky-social/indigo/atproto/syntax"
16
+
"github.com/bluesky-social/indigo/xrpc"
17
+
"github.com/bluesky-social/jetstream/pkg/models"
18
+
securejoin "github.com/cyphar/filepath-securejoin"
19
+
"tangled.sh/tangled.sh/core/api/tangled"
20
+
"tangled.sh/tangled.sh/core/appview/idresolver"
21
+
"tangled.sh/tangled.sh/core/knotserver/db"
22
+
"tangled.sh/tangled.sh/core/knotserver/git"
23
+
"tangled.sh/tangled.sh/core/log"
24
+
"tangled.sh/tangled.sh/core/workflow"
27
+
func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error {
28
+
l := log.FromContext(ctx)
33
+
if err := h.db.AddPublicKey(pk); err != nil {
34
+
l.Error("failed to add public key", "error", err)
35
+
return fmt.Errorf("failed to add public key: %w", err)
37
+
l.Info("added public key from firehose", "did", did)
41
+
func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error {
42
+
l := log.FromContext(ctx)
44
+
if record.Domain != h.c.Server.Hostname {
45
+
l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
46
+
return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
49
+
ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite")
50
+
if err != nil || !ok {
51
+
l.Error("failed to add member", "did", did)
52
+
return fmt.Errorf("failed to enforce permissions: %w", err)
55
+
if err := h.e.AddKnotMember(ThisServer, record.Subject); err != nil {
56
+
l.Error("failed to add member", "error", err)
57
+
return fmt.Errorf("failed to add member: %w", err)
59
+
l.Info("added member from firehose", "member", record.Subject)
61
+
if err := h.db.AddDid(did); err != nil {
62
+
l.Error("failed to add did", "error", err)
63
+
return fmt.Errorf("failed to add did: %w", err)
67
+
if err := h.fetchAndAddKeys(ctx, did); err != nil {
68
+
return fmt.Errorf("failed to fetch and add keys: %w", err)
74
+
func (h *Handle) processPull(ctx context.Context, did string, record tangled.RepoPull) error {
75
+
l := log.FromContext(ctx)
76
+
l = l.With("handler", "processPull")
77
+
l = l.With("did", did)
78
+
l = l.With("target_repo", record.TargetRepo)
79
+
l = l.With("target_branch", record.TargetBranch)
81
+
if record.Source == nil {
82
+
reason := "not a branch-based pull request"
83
+
l.Info("ignoring pull record", "reason", reason)
84
+
return fmt.Errorf("ignoring pull record: %s", reason)
87
+
if record.Source.Repo != nil {
88
+
reason := "fork based pull"
89
+
l.Info("ignoring pull record", "reason", reason)
90
+
return fmt.Errorf("ignoring pull record: %s", reason)
93
+
allDids, err := h.db.GetAllDids()
98
+
// presently: we only process PRs from collaborators for pipelines
99
+
if !slices.Contains(allDids, did) {
100
+
reason := "not a known did"
101
+
l.Info("rejecting pull record", "reason", reason)
102
+
return fmt.Errorf("rejected pull record: %s, %s", reason, did)
105
+
repoAt, err := syntax.ParseATURI(record.TargetRepo)
110
+
// resolve this aturi to extract the repo record
111
+
resolver := idresolver.DefaultResolver()
112
+
ident, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
113
+
if err != nil || ident.Handle.IsInvalidHandle() {
114
+
return fmt.Errorf("failed to resolve handle: %w", err)
117
+
xrpcc := xrpc.Client{
118
+
Host: ident.PDSEndpoint(),
121
+
resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
126
+
repo := resp.Value.Val.(*tangled.Repo)
128
+
if repo.Knot != h.c.Server.Hostname {
129
+
reason := "not this knot"
130
+
l.Info("rejecting pull record", "reason", reason)
131
+
return fmt.Errorf("rejected pull record: %s", reason)
134
+
didSlashRepo, err := securejoin.SecureJoin(repo.Owner, repo.Name)
139
+
repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo)
144
+
gr, err := git.Open(repoPath, record.Source.Branch)
149
+
workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
154
+
var pipeline workflow.Pipeline
155
+
for _, e := range workflowDir {
160
+
fpath := filepath.Join(workflow.WorkflowDir, e.Name)
161
+
contents, err := gr.RawContent(fpath)
166
+
wf, err := workflow.FromFile(e.Name, contents)
168
+
// TODO: log here, respond to client that is pushing
169
+
h.l.Error("failed to parse workflow", "err", err, "path", fpath)
173
+
pipeline = append(pipeline, wf)
176
+
trigger := tangled.Pipeline_PullRequestTriggerData{
178
+
SourceBranch: record.Source.Branch,
179
+
SourceSha: record.Source.Sha,
180
+
TargetBranch: record.TargetBranch,
183
+
compiler := workflow.Compiler{
184
+
Trigger: tangled.Pipeline_TriggerMetadata{
185
+
Kind: string(workflow.TriggerKindPullRequest),
186
+
PullRequest: &trigger,
187
+
Repo: &tangled.Pipeline_TriggerRepo{
195
+
cp := compiler.Compile(pipeline)
196
+
eventJson, err := json.Marshal(cp)
201
+
// do not run empty pipelines
202
+
if cp.Workflows == nil {
208
+
Nsid: tangled.PipelineNSID,
209
+
EventJson: string(eventJson),
212
+
return h.db.InsertEvent(event, h.n)
215
+
func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
216
+
l := log.FromContext(ctx)
218
+
keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
220
+
l.Error("error building endpoint url", "did", did, "error", err.Error())
221
+
return fmt.Errorf("error building endpoint url: %w", err)
224
+
resp, err := http.Get(keysEndpoint)
226
+
l.Error("error getting keys", "did", did, "error", err)
227
+
return fmt.Errorf("error getting keys: %w", err)
229
+
defer resp.Body.Close()
231
+
if resp.StatusCode == http.StatusNotFound {
232
+
l.Info("no keys found for did", "did", did)
236
+
plaintext, err := io.ReadAll(resp.Body)
238
+
l.Error("error reading response body", "error", err)
239
+
return fmt.Errorf("error reading response body: %w", err)
242
+
for _, key := range strings.Split(string(plaintext), "\n") {
246
+
pk := db.PublicKey{
250
+
if err := h.db.AddPublicKey(pk); err != nil {
251
+
l.Error("failed to add public key", "error", err)
252
+
return fmt.Errorf("failed to add public key: %w", err)
258
+
func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
260
+
if event.Kind != models.EventKindCommit {
266
+
eventTime := event.TimeUS
267
+
lastTimeUs := eventTime + 1
268
+
fmt.Println("lastTimeUs", lastTimeUs)
269
+
if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
270
+
err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
274
+
raw := json.RawMessage(event.Commit.Record)
276
+
switch event.Commit.Collection {
277
+
case tangled.PublicKeyNSID:
278
+
var record tangled.PublicKey
279
+
if err := json.Unmarshal(raw, &record); err != nil {
280
+
return fmt.Errorf("failed to unmarshal record: %w", err)
282
+
if err := h.processPublicKey(ctx, did, record); err != nil {
283
+
return fmt.Errorf("failed to process public key: %w", err)
286
+
case tangled.KnotMemberNSID:
287
+
var record tangled.KnotMember
288
+
if err := json.Unmarshal(raw, &record); err != nil {
289
+
return fmt.Errorf("failed to unmarshal record: %w", err)
291
+
if err := h.processKnotMember(ctx, did, record); err != nil {
292
+
return fmt.Errorf("failed to process knot member: %w", err)
294
+
case tangled.RepoPullNSID:
295
+
var record tangled.RepoPull
296
+
if err := json.Unmarshal(raw, &record); err != nil {
297
+
return fmt.Errorf("failed to unmarshal record: %w", err)
299
+
if err := h.processPull(ctx, did, record); err != nil {
300
+
return fmt.Errorf("failed to process knot member: %w", err)