1package db
2
3import (
4 "fmt"
5 "time"
6
7 "tangled.sh/tangled.sh/core/api/tangled"
8 "tangled.sh/tangled.sh/core/notifier"
9)
10
11type PipelineRunStatus string
12
13var (
14 PipelinePending PipelineRunStatus = "pending"
15 PipelineRunning PipelineRunStatus = "running"
16 PipelineFailed PipelineRunStatus = "failed"
17 PipelineTimeout PipelineRunStatus = "timeout"
18 PipelineCancelled PipelineRunStatus = "cancelled"
19 PipelineSuccess PipelineRunStatus = "success"
20)
21
22type PipelineStatus struct {
23 Rkey string `json:"rkey"`
24 Pipeline string `json:"pipeline"`
25 Status PipelineRunStatus `json:"status"`
26
27 // only if Failed
28 Error string `json:"error"`
29 ExitCode int `json:"exit_code"`
30
31 StartedAt time.Time `json:"started_at"`
32 UpdatedAt time.Time `json:"updated_at"`
33 FinishedAt time.Time `json:"finished_at"`
34}
35
36func (p PipelineStatus) AsRecord() *tangled.PipelineStatus {
37 exitCode64 := int64(p.ExitCode)
38 finishedAt := p.FinishedAt.String()
39
40 return &tangled.PipelineStatus{
41 LexiconTypeID: tangled.PipelineStatusNSID,
42 Pipeline: p.Pipeline,
43 Status: string(p.Status),
44
45 ExitCode: &exitCode64,
46 Error: &p.Error,
47
48 StartedAt: p.StartedAt.String(),
49 UpdatedAt: p.UpdatedAt.String(),
50 FinishedAt: &finishedAt,
51 }
52}
53
54func pipelineAtUri(rkey, knot string) string {
55 return fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineStatusNSID, knot, rkey)
56}
57
58func (db *DB) CreatePipeline(rkey, pipeline string, n *notifier.Notifier) error {
59 _, err := db.Exec(`
60 insert into pipeline_status (rkey, status, pipeline)
61 values (?, ?, ?)
62 `, rkey, PipelinePending, pipeline)
63
64 if err != nil {
65 return err
66 }
67 n.NotifyAll()
68 return nil
69}
70
71func (db *DB) MarkPipelineRunning(rkey string, n *notifier.Notifier) error {
72 _, err := db.Exec(`
73 update pipeline_status
74 set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
75 where rkey = ?
76 `, PipelineRunning, rkey)
77
78 if err != nil {
79 return err
80 }
81 n.NotifyAll()
82 return nil
83}
84
85func (db *DB) MarkPipelineFailed(rkey string, exitCode int, errorMsg string, n *notifier.Notifier) error {
86 _, err := db.Exec(`
87 update pipeline_status
88 set status = ?,
89 exit_code = ?,
90 error = ?,
91 updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'),
92 finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
93 where rkey = ?
94 `, PipelineFailed, exitCode, errorMsg, rkey)
95 if err != nil {
96 return err
97 }
98 n.NotifyAll()
99 return nil
100}
101
102func (db *DB) MarkPipelineTimeout(rkey string, n *notifier.Notifier) error {
103 _, err := db.Exec(`
104 update pipeline_status
105 set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
106 where rkey = ?
107 `, PipelineTimeout, rkey)
108 if err != nil {
109 return err
110 }
111 n.NotifyAll()
112 return nil
113}
114
115func (db *DB) MarkPipelineSuccess(rkey string, n *notifier.Notifier) error {
116 _, err := db.Exec(`
117 update pipeline_status
118 set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'),
119 finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
120 where rkey = ?
121 `, PipelineSuccess, rkey)
122
123 if err != nil {
124 return err
125 }
126 n.NotifyAll()
127 return nil
128}
129
130func (db *DB) GetPipelineStatus(rkey string) (PipelineStatus, error) {
131 var p PipelineStatus
132 err := db.QueryRow(`
133 select rkey, status, error, exit_code, started_at, updated_at, finished_at
134 from pipelines
135 where rkey = ?
136 `, rkey).Scan(&p.Rkey, &p.Status, &p.Error, &p.ExitCode, &p.StartedAt, &p.UpdatedAt, &p.FinishedAt)
137 return p, err
138}
139
140func (db *DB) GetPipelineStatusAsRecords(cursor string) ([]PipelineStatus, error) {
141 whereClause := ""
142 args := []any{}
143 if cursor != "" {
144 whereClause = "where rkey > ?"
145 args = append(args, cursor)
146 }
147
148 query := fmt.Sprintf(`
149 select rkey, status, error, exit_code, started_at, updated_at, finished_at
150 from pipeline_status
151 %s
152 order by rkey asc
153 limit 100
154 `, whereClause)
155
156 rows, err := db.Query(query, args...)
157 if err != nil {
158 return nil, err
159 }
160 defer rows.Close()
161
162 var pipelines []PipelineStatus
163 for rows.Next() {
164 var p PipelineStatus
165 rows.Scan(&p.Rkey, &p.Status, &p.Error, &p.ExitCode, &p.StartedAt, &p.UpdatedAt, &p.FinishedAt)
166 pipelines = append(pipelines, p)
167 }
168
169 if err := rows.Err(); err != nil {
170 return nil, err
171 }
172
173 records := []*tangled.PipelineStatus{}
174 for _, p := range pipelines {
175 records = append(records, p.AsRecord())
176 }
177
178 return pipelines, nil
179}