forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db
2
3import (
4 "fmt"
5 "time"
6
7 "tangled.sh/tangled.sh/core/api/tangled"
8 "tangled.sh/tangled.sh/core/knotserver/notifier"
9)
10
11type PipelineStatus string
12
13var (
14 PipelinePending PipelineStatus = "pending"
15 PipelineRunning PipelineStatus = "running"
16 PipelineFailed PipelineStatus = "failed"
17 PipelineTimeout PipelineStatus = "timeout"
18 PipelineCancelled PipelineStatus = "cancelled"
19 PipelineSuccess PipelineStatus = "success"
20)
21
22type Pipeline struct {
23 Rkey string `json:"rkey"`
24 Knot string `json:"knot"`
25 Status PipelineStatus `json:"status"`
26
27 // only if Failed
28 Error string `json:"error"`
29 ExitCode int `json:"exit_code"`
30
31 StartedAt time.Time `json:"started_at"`
32 UpdatedAt time.Time `json:"updated_at"`
33 FinishedAt time.Time `json:"finished_at"`
34}
35
36func (p Pipeline) AsRecord() *tangled.PipelineStatus {
37 exitCode64 := int64(p.ExitCode)
38 finishedAt := p.FinishedAt.String()
39
40 return &tangled.PipelineStatus{
41 Pipeline: fmt.Sprintf("at://%s/%s", p.Knot, p.Rkey),
42 Status: string(p.Status),
43
44 ExitCode: &exitCode64,
45 Error: &p.Error,
46
47 StartedAt: p.StartedAt.String(),
48 UpdatedAt: p.UpdatedAt.String(),
49 FinishedAt: &finishedAt,
50 }
51}
52
53func pipelineAtUri(rkey, knot string) string {
54 return fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineStatusNSID, knot, rkey)
55}
56
57func (db *DB) CreatePipeline(rkey, knot string, n *notifier.Notifier) error {
58 _, err := db.Exec(`
59 insert into pipelines (at_uri, status)
60 values (?, ?)
61 `, pipelineAtUri(rkey, knot), PipelinePending)
62
63 if err != nil {
64 return err
65 }
66 n.NotifyAll()
67 return nil
68}
69
70func (db *DB) MarkPipelineRunning(rkey, knot string, n *notifier.Notifier) error {
71 _, err := db.Exec(`
72 update pipelines
73 set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
74 where at_uri = ?
75 `, PipelineRunning, pipelineAtUri(rkey, knot))
76
77 if err != nil {
78 return err
79 }
80 n.NotifyAll()
81 return nil
82}
83
84func (db *DB) MarkPipelineFailed(rkey, knot string, exitCode int, errorMsg string, n *notifier.Notifier) error {
85 _, err := db.Exec(`
86 update pipelines
87 set status = ?,
88 exit_code = ?,
89 error = ?,
90 updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'),
91 finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
92 where at_uri = ?
93 `, PipelineFailed, exitCode, errorMsg, pipelineAtUri(rkey, knot))
94 if err != nil {
95 return err
96 }
97 n.NotifyAll()
98 return nil
99}
100
101func (db *DB) MarkPipelineTimeout(rkey, knot string, n *notifier.Notifier) error {
102 _, err := db.Exec(`
103 update pipelines
104 set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
105 where at_uri = ?
106 `, PipelineTimeout, pipelineAtUri(rkey, knot))
107 if err != nil {
108 return err
109 }
110 n.NotifyAll()
111 return nil
112}
113
114func (db *DB) MarkPipelineSuccess(rkey, knot string, n *notifier.Notifier) error {
115 _, err := db.Exec(`
116 update pipelines
117 set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'),
118 finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
119 where at_uri = ?
120 `, PipelineSuccess, pipelineAtUri(rkey, knot))
121
122 if err != nil {
123 return err
124 }
125 n.NotifyAll()
126 return nil
127}
128
129func (db *DB) GetPipeline(rkey, knot string) (Pipeline, error) {
130 var p Pipeline
131 err := db.QueryRow(`
132 select rkey, status, error, exit_code, started_at, updated_at, finished_at
133 from pipelines
134 where at_uri = ?
135 `, pipelineAtUri(rkey, knot)).Scan(&p.Rkey, &p.Status, &p.Error, &p.ExitCode, &p.StartedAt, &p.UpdatedAt, &p.FinishedAt)
136 return p, err
137}
138
139func (db *DB) GetPipelines(cursor string) ([]Pipeline, error) {
140 whereClause := ""
141 args := []any{}
142 if cursor != "" {
143 whereClause = "where rkey > ?"
144 args = append(args, cursor)
145 }
146
147 query := fmt.Sprintf(`
148 select rkey, status, error, exit_code, started_at, updated_at, finished_at
149 from pipelines
150 %s
151 order by rkey asc
152 limit 100
153 `, whereClause)
154
155 rows, err := db.Query(query, args...)
156 if err != nil {
157 return nil, err
158 }
159 defer rows.Close()
160
161 var pipelines []Pipeline
162 for rows.Next() {
163 var p Pipeline
164 rows.Scan(&p.Rkey, &p.Status, &p.Error, &p.ExitCode, &p.StartedAt, &p.UpdatedAt, &p.FinishedAt)
165 pipelines = append(pipelines, p)
166 }
167
168 if err := rows.Err(); err != nil {
169 return nil, err
170 }
171
172 return pipelines, nil
173}