1package db
2
3import (
4 "encoding/json"
5 "fmt"
6 "time"
7
8 "tangled.sh/tangled.sh/core/api/tangled"
9 "tangled.sh/tangled.sh/core/notifier"
10 "tangled.sh/tangled.sh/core/spindle/models"
11 "tangled.sh/tangled.sh/core/tid"
12)
13
14type Event struct {
15 Rkey string `json:"rkey"`
16 Nsid string `json:"nsid"`
17 Created int64 `json:"created"`
18 EventJson string `json:"event"`
19}
20
21func (d *DB) InsertEvent(event Event, notifier *notifier.Notifier) error {
22 _, err := d.Exec(
23 `insert into events (rkey, nsid, event, created) values (?, ?, ?, ?)`,
24 event.Rkey,
25 event.Nsid,
26 event.EventJson,
27 time.Now().UnixNano(),
28 )
29
30 notifier.NotifyAll()
31
32 return err
33}
34
35func (d *DB) GetEvents(cursor int64) ([]Event, error) {
36 whereClause := ""
37 args := []any{}
38 if cursor > 0 {
39 whereClause = "where created > ?"
40 args = append(args, cursor)
41 }
42
43 query := fmt.Sprintf(`
44 select rkey, nsid, event, created
45 from events
46 %s
47 order by created asc
48 limit 100
49 `, whereClause)
50
51 rows, err := d.Query(query, args...)
52 if err != nil {
53 return nil, err
54 }
55 defer rows.Close()
56
57 var evts []Event
58 for rows.Next() {
59 var ev Event
60 if err := rows.Scan(&ev.Rkey, &ev.Nsid, &ev.EventJson, &ev.Created); err != nil {
61 return nil, err
62 }
63 evts = append(evts, ev)
64 }
65
66 if err := rows.Err(); err != nil {
67 return nil, err
68 }
69
70 return evts, nil
71}
72
73func (d *DB) CreateStatusEvent(rkey string, s tangled.PipelineStatus, n *notifier.Notifier) error {
74 eventJson, err := json.Marshal(s)
75 if err != nil {
76 return err
77 }
78
79 event := Event{
80 Rkey: rkey,
81 Nsid: tangled.PipelineStatusNSID,
82 Created: time.Now().UnixNano(),
83 EventJson: string(eventJson),
84 }
85
86 return d.InsertEvent(event, n)
87}
88
89type StatusKind string
90
91var (
92 StatusKindPending StatusKind = "pending"
93 StatusKindRunning StatusKind = "running"
94 StatusKindFailed StatusKind = "failed"
95 StatusKindTimeout StatusKind = "timeout"
96 StatusKindCancelled StatusKind = "cancelled"
97 StatusKindSuccess StatusKind = "success"
98)
99
100func (d *DB) createStatusEvent(
101 workflowId models.WorkflowId,
102 statusKind StatusKind,
103 workflowError *string,
104 exitCode *int64,
105 n *notifier.Notifier,
106) error {
107 now := time.Now()
108 pipelineAtUri := workflowId.PipelineId.AtUri()
109 s := tangled.PipelineStatus{
110 CreatedAt: now.Format(time.RFC3339),
111 Error: workflowError,
112 ExitCode: exitCode,
113 Pipeline: string(pipelineAtUri),
114 Workflow: workflowId.Name,
115 Status: string(statusKind),
116 }
117
118 eventJson, err := json.Marshal(s)
119 if err != nil {
120 return err
121 }
122
123 event := Event{
124 Rkey: tid.TID(),
125 Nsid: tangled.PipelineStatusNSID,
126 Created: now.UnixNano(),
127 EventJson: string(eventJson),
128 }
129
130 return d.InsertEvent(event, n)
131
132}
133
134func (d *DB) StatusPending(workflowId models.WorkflowId, n *notifier.Notifier) error {
135 return d.createStatusEvent(workflowId, StatusKindPending, nil, nil, n)
136}
137
138func (d *DB) StatusRunning(workflowId models.WorkflowId, n *notifier.Notifier) error {
139 return d.createStatusEvent(workflowId, StatusKindRunning, nil, nil, n)
140}
141
142func (d *DB) StatusFailed(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error {
143 return d.createStatusEvent(workflowId, StatusKindFailed, &workflowError, &exitCode, n)
144}
145
146func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error {
147 return d.createStatusEvent(workflowId, StatusKindSuccess, nil, nil, n)
148}