forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db
2
3//
4// import (
5// "database/sql"
6// "fmt"
7// "time"
8//
9// "tangled.sh/tangled.sh/core/api/tangled"
10// "tangled.sh/tangled.sh/core/notifier"
11// )
12//
13// type PipelineRunStatus string
14//
15// var (
16// PipelinePending PipelineRunStatus = "pending"
17// PipelineRunning PipelineRunStatus = "running"
18// PipelineFailed PipelineRunStatus = "failed"
19// PipelineTimeout PipelineRunStatus = "timeout"
20// PipelineCancelled PipelineRunStatus = "cancelled"
21// PipelineSuccess PipelineRunStatus = "success"
22// )
23//
24// type PipelineStatus struct {
25// Rkey string `json:"rkey"`
26// Pipeline string `json:"pipeline"`
27// Status PipelineRunStatus `json:"status"`
28//
29// // only if Failed
30// Error string `json:"error"`
31// ExitCode int `json:"exit_code"`
32//
33// LastUpdate int64 `json:"last_update"`
34// StartedAt time.Time `json:"started_at"`
35// UpdatedAt time.Time `json:"updated_at"`
36// FinishedAt time.Time `json:"finished_at"`
37// }
38//
39// func (p PipelineStatus) AsRecord() *tangled.PipelineStatus {
40// exitCode64 := int64(p.ExitCode)
41// finishedAt := p.FinishedAt.String()
42//
43// return &tangled.PipelineStatus{
44// LexiconTypeID: tangled.PipelineStatusNSID,
45// Pipeline: p.Pipeline,
46// Status: string(p.Status),
47//
48// ExitCode: &exitCode64,
49// Error: &p.Error,
50//
51// StartedAt: p.StartedAt.String(),
52// UpdatedAt: p.UpdatedAt.String(),
53// FinishedAt: &finishedAt,
54// }
55// }
56//
57// func pipelineAtUri(rkey, knot string) string {
58// return fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineStatusNSID, knot, rkey)
59// }
60//
61// func (db *DB) CreatePipeline(rkey, pipeline string, n *notifier.Notifier) error {
62// _, err := db.Exec(`
63// insert into pipeline_status (rkey, status, pipeline, last_update)
64// values (?, ?, ?, ?)
65// `, rkey, PipelinePending, pipeline, time.Now().UnixNano())
66//
67// if err != nil {
68// return err
69// }
70// n.NotifyAll()
71// return nil
72// }
73//
74// func (db *DB) MarkPipelineRunning(rkey string, n *notifier.Notifier) error {
75// _, err := db.Exec(`
76// update pipeline_status
77// set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), last_update = ?
78// where rkey = ?
79// `, PipelineRunning, rkey, time.Now().UnixNano())
80//
81// if err != nil {
82// return err
83// }
84// n.NotifyAll()
85// return nil
86// }
87//
88// func (db *DB) MarkPipelineFailed(rkey string, exitCode int, errorMsg string, n *notifier.Notifier) error {
89// _, err := db.Exec(`
90// update pipeline_status
91// set status = ?,
92// exit_code = ?,
93// error = ?,
94// updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'),
95// finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'),
96// last_update = ?
97// where rkey = ?
98// `, PipelineFailed, exitCode, errorMsg, rkey, time.Now().UnixNano())
99// if err != nil {
100// return err
101// }
102// n.NotifyAll()
103// return nil
104// }
105//
106// func (db *DB) MarkPipelineTimeout(rkey string, n *notifier.Notifier) error {
107// _, err := db.Exec(`
108// update pipeline_status
109// set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
110// where rkey = ?
111// `, PipelineTimeout, rkey)
112// if err != nil {
113// return err
114// }
115// n.NotifyAll()
116// return nil
117// }
118//
119// func (db *DB) MarkPipelineSuccess(rkey string, n *notifier.Notifier) error {
120// _, err := db.Exec(`
121// update pipeline_status
122// set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'),
123// finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
124// where rkey = ?
125// `, PipelineSuccess, rkey)
126//
127// if err != nil {
128// return err
129// }
130// n.NotifyAll()
131// return nil
132// }
133//
134// func (db *DB) GetPipelineStatus(rkey string) (PipelineStatus, error) {
135// var p PipelineStatus
136// err := db.QueryRow(`
137// select rkey, status, error, exit_code, started_at, updated_at, finished_at
138// from pipelines
139// where rkey = ?
140// `, rkey).Scan(&p.Rkey, &p.Status, &p.Error, &p.ExitCode, &p.StartedAt, &p.UpdatedAt, &p.FinishedAt)
141// return p, err
142// }
143//
144// func (db *DB) GetPipelineStatusAsRecords(cursor int64) ([]PipelineStatus, error) {
145// whereClause := ""
146// args := []any{}
147// if cursor != 0 {
148// whereClause = "where created_at > ?"
149// args = append(args, cursor)
150// }
151//
152// query := fmt.Sprintf(`
153// select rkey, status, error, exit_code, created_at, started_at, updated_at, finished_at
154// from pipeline_status
155// %s
156// order by created_at asc
157// limit 100
158// `, whereClause)
159//
160// rows, err := db.Query(query, args...)
161// if err != nil {
162// return nil, err
163// }
164// defer rows.Close()
165//
166// var pipelines []PipelineStatus
167// for rows.Next() {
168// var p PipelineStatus
169// var pipelineError sql.NullString
170// var exitCode sql.NullInt64
171// var startedAt, updatedAt string
172// var finishedAt sql.NullTime
173//
174// err := rows.Scan(&p.Rkey, &p.Status, &pipelineError, &exitCode, &p.LastUpdate, &startedAt, &updatedAt, &finishedAt)
175// if err != nil {
176// return nil, err
177// }
178//
179// if pipelineError.Valid {
180// p.Error = pipelineError.String
181// }
182//
183// if exitCode.Valid {
184// p.ExitCode = int(exitCode.Int64)
185// }
186//
187// if v, err := time.Parse(time.RFC3339, startedAt); err == nil {
188// p.StartedAt = v
189// }
190//
191// if v, err := time.Parse(time.RFC3339, updatedAt); err == nil {
192// p.UpdatedAt = v
193// }
194//
195// if finishedAt.Valid {
196// p.FinishedAt = finishedAt.Time
197// }
198//
199// pipelines = append(pipelines, p)
200// }
201//
202// if err := rows.Err(); err != nil {
203// return nil, err
204// }
205//
206// return pipelines, nil
207// }