From 0b3decb45ad9b0e5a9e1bc2d33e501425d08d40a Mon Sep 17 00:00:00 2001 From: oppiliappan Date: Wed, 9 Jul 2025 18:14:51 +0100 Subject: [PATCH] appview: improve error handling in knotstream/spindlestream Change-Id: xknqquoyvsuyxrwqmutwyuxnlnrppvyt Signed-off-by: oppiliappan --- appview/state/knotstream.go | 15 +++++++-------- appview/state/spindlestream.go | 8 +++++++- knotserver/internal.go | 8 ++------ knotserver/server.go | 1 + 4 files changed, 17 insertions(+), 15 deletions(-) diff --git a/appview/state/knotstream.go b/appview/state/knotstream.go index 71432d1..e5b490c 100644 --- a/appview/state/knotstream.go +++ b/appview/state/knotstream.go @@ -143,15 +143,14 @@ func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error { // trigger info var trigger db.Trigger var sha string - switch record.TriggerMetadata.Kind { + trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) + switch trigger.Kind { case workflow.TriggerKindPush: - trigger.Kind = workflow.TriggerKindPush trigger.PushRef = &record.TriggerMetadata.Push.Ref trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha sha = *trigger.PushNewSha case workflow.TriggerKindPullRequest: - trigger.Kind = workflow.TriggerKindPush trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha @@ -161,12 +160,12 @@ func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error { tx, err := d.Begin() if err != nil { - return err + return fmt.Errorf("failed to start txn: %w", err) } triggerId, err := db.AddTrigger(tx, trigger) if err != nil { - return err + return fmt.Errorf("failed to add trigger entry: %w", err) } pipeline := db.Pipeline{ @@ -180,13 +179,13 @@ func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error { err = db.AddPipeline(tx, pipeline) if err != nil { - return err + return fmt.Errorf("failed to add pipeline: %w", err) } err = tx.Commit() if err != nil { - return err + return fmt.Errorf("failed to commit txn: %w", err) } - return err + return nil } diff --git a/appview/state/spindlestream.go b/appview/state/spindlestream.go index d40dfd1..1e98425 100644 --- a/appview/state/spindlestream.go +++ b/appview/state/spindlestream.go @@ -3,6 +3,7 @@ package state import ( "context" "encoding/json" + "fmt" "log/slog" "strings" "time" @@ -100,5 +101,10 @@ func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, so ExitCode: exitCode, } - return db.AddPipelineStatus(d, status) + err = db.AddPipelineStatus(d, status) + if err != nil { + return fmt.Errorf("failed to add pipeline status: %w", err) + } + + return nil } diff --git a/knotserver/internal.go b/knotserver/internal.go index 6631ff7..c0665c6 100644 --- a/knotserver/internal.go +++ b/knotserver/internal.go @@ -147,10 +147,6 @@ func (h *InternalHandle) insertRefUpdate(line git.PostReceiveLine, gitUserDid, r } func (h *InternalHandle) triggerPipeline(line git.PostReceiveLine, gitUserDid, repoDid, repoName string) error { - const ( - WorkflowDir = ".tangled/workflows" - ) - didSlashRepo, err := securejoin.SecureJoin(repoDid, repoName) if err != nil { return err @@ -166,7 +162,7 @@ func (h *InternalHandle) triggerPipeline(line git.PostReceiveLine, gitUserDid, r return err } - workflowDir, err := gr.FileTree(context.Background(), WorkflowDir) + workflowDir, err := gr.FileTree(context.Background(), workflow.WorkflowDir) if err != nil { return err } @@ -177,7 +173,7 @@ func (h *InternalHandle) triggerPipeline(line git.PostReceiveLine, gitUserDid, r continue } - fpath := filepath.Join(WorkflowDir, e.Name) + fpath := filepath.Join(workflow.WorkflowDir, e.Name) contents, err := gr.RawContent(fpath) if err != nil { continue diff --git a/knotserver/server.go b/knotserver/server.go index 984aabc..575795a 100644 --- a/knotserver/server.go +++ b/knotserver/server.go @@ -75,6 +75,7 @@ func Run(ctx context.Context, cmd *cli.Command) error { jc, err := jetstream.NewJetstreamClient(c.Server.JetstreamEndpoint, "knotserver", []string{ tangled.PublicKeyNSID, tangled.KnotMemberNSID, + tangled.RepoPullNSID, }, nil, logger, db, true, c.Server.LogDids) if err != nil { logger.Error("failed to setup jetstream", "error", err) -- 2.43.0