forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db
2
3import (
4 "encoding/json"
5 "fmt"
6 "time"
7
8 "tangled.org/core/api/tangled"
9 "tangled.org/core/notifier"
10 "tangled.org/core/spindle/models"
11 "tangled.org/core/tid"
12)
13
14type Event struct {
15 Rkey string `json:"rkey"`
16 Nsid string `json:"nsid"`
17 Created int64 `json:"created"`
18 EventJson string `json:"event"`
19}
20
21func (d *DB) InsertEvent(event Event, notifier *notifier.Notifier) error {
22 _, err := d.Exec(
23 `insert into events (rkey, nsid, event, created) values (?, ?, ?, ?)`,
24 event.Rkey,
25 event.Nsid,
26 event.EventJson,
27 time.Now().UnixNano(),
28 )
29
30 notifier.NotifyAll()
31
32 return err
33}
34
35func (d *DB) GetEvents(cursor int64) ([]Event, error) {
36 whereClause := ""
37 args := []any{}
38 if cursor > 0 {
39 whereClause = "where created > ?"
40 args = append(args, cursor)
41 }
42
43 query := fmt.Sprintf(`
44 select rkey, nsid, event, created
45 from events
46 %s
47 order by created asc
48 limit 100
49 `, whereClause)
50
51 rows, err := d.Query(query, args...)
52 if err != nil {
53 return nil, err
54 }
55 defer rows.Close()
56
57 var evts []Event
58 for rows.Next() {
59 var ev Event
60 if err := rows.Scan(&ev.Rkey, &ev.Nsid, &ev.EventJson, &ev.Created); err != nil {
61 return nil, err
62 }
63 evts = append(evts, ev)
64 }
65
66 if err := rows.Err(); err != nil {
67 return nil, err
68 }
69
70 return evts, nil
71}
72
73func (d *DB) CreateStatusEvent(rkey string, s tangled.PipelineStatus, n *notifier.Notifier) error {
74 eventJson, err := json.Marshal(s)
75 if err != nil {
76 return err
77 }
78
79 event := Event{
80 Rkey: rkey,
81 Nsid: tangled.PipelineStatusNSID,
82 Created: time.Now().UnixNano(),
83 EventJson: string(eventJson),
84 }
85
86 return d.InsertEvent(event, n)
87}
88
89func (d *DB) createStatusEvent(
90 workflowId models.WorkflowId,
91 statusKind models.StatusKind,
92 workflowError *string,
93 exitCode *int64,
94 n *notifier.Notifier,
95) error {
96 now := time.Now()
97 pipelineAtUri := workflowId.PipelineId.AtUri()
98 s := tangled.PipelineStatus{
99 CreatedAt: now.Format(time.RFC3339),
100 Error: workflowError,
101 ExitCode: exitCode,
102 Pipeline: string(pipelineAtUri),
103 Workflow: workflowId.Name,
104 Status: string(statusKind),
105 }
106
107 eventJson, err := json.Marshal(s)
108 if err != nil {
109 return err
110 }
111
112 event := Event{
113 Rkey: tid.TID(),
114 Nsid: tangled.PipelineStatusNSID,
115 Created: now.UnixNano(),
116 EventJson: string(eventJson),
117 }
118
119 return d.InsertEvent(event, n)
120
121}
122
123func (d *DB) GetStatus(workflowId models.WorkflowId) (*tangled.PipelineStatus, error) {
124 pipelineAtUri := workflowId.PipelineId.AtUri()
125
126 var eventJson string
127 err := d.QueryRow(
128 `
129 select
130 event from events
131 where
132 nsid = ?
133 and json_extract(event, '$.pipeline') = ?
134 and json_extract(event, '$.workflow') = ?
135 order by
136 created desc
137 limit
138 1
139 `,
140 tangled.PipelineStatusNSID,
141 string(pipelineAtUri),
142 workflowId.Name,
143 ).Scan(&eventJson)
144
145 if err != nil {
146 return nil, err
147 }
148
149 var status tangled.PipelineStatus
150 if err := json.Unmarshal([]byte(eventJson), &status); err != nil {
151 return nil, err
152 }
153
154 return &status, nil
155}
156
157func (d *DB) StatusPending(workflowId models.WorkflowId, n *notifier.Notifier) error {
158 return d.createStatusEvent(workflowId, models.StatusKindPending, nil, nil, n)
159}
160
161func (d *DB) StatusRunning(workflowId models.WorkflowId, n *notifier.Notifier) error {
162 return d.createStatusEvent(workflowId, models.StatusKindRunning, nil, nil, n)
163}
164
165func (d *DB) StatusFailed(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error {
166 return d.createStatusEvent(workflowId, models.StatusKindFailed, &workflowError, &exitCode, n)
167}
168
169func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error {
170 return d.createStatusEvent(workflowId, models.StatusKindSuccess, nil, nil, n)
171}
172
173func (d *DB) StatusTimeout(workflowId models.WorkflowId, n *notifier.Notifier) error {
174 return d.createStatusEvent(workflowId, models.StatusKindTimeout, nil, nil, n)
175}