1package metrics
2
3import (
4 "database/sql"
5 "errors"
6 "fmt"
7 "log/slog"
8 "sync"
9 "time"
10
11 _ "github.com/ClickHouse/clickhouse-go/v2"
12
13 "tangled.sh/seiso.moe/alky/pkg/config"
14)
15
16type ClickHouseMetrics struct {
17 db *sql.DB
18 config *config.MetricsConfig
19 queryBuffer []QueryMetric
20 mu sync.Mutex
21 stopChan chan struct{}
22 wg sync.WaitGroup
23 logger *slog.Logger
24}
25
26type QueryMetric struct {
27 Timestamp time.Time
28 InstanceID string
29 QueryName string
30 QueryType string
31 QueryClass string
32 RemoteAddr string
33 ResponseCode string
34 Duration int64
35}
36
37func NewClickHouseMetrics(config *config.MetricsConfig, logger *slog.Logger) (*ClickHouseMetrics, error) {
38 if logger == nil {
39 logger = slog.Default()
40 }
41 logger.Info("Connecting to ClickHouse", "dsn", config.DSN)
42
43 db, err := sql.Open("clickhouse", config.DSN)
44 if err != nil {
45 return nil, fmt.Errorf("failed to initialize ClickHouse driver: %w", err)
46 }
47
48 if err := db.Ping(); err != nil {
49 db.Close()
50 return nil, fmt.Errorf("failed to connect to ClickHouse (%s): %w", config.DSN, err)
51 }
52 logger.Info("Successfully connected to ClickHouse")
53
54 m := &ClickHouseMetrics{
55 db: db,
56 config: config,
57 queryBuffer: make([]QueryMetric, 0, config.BatchSize),
58 stopChan: make(chan struct{}),
59 logger: logger,
60 }
61
62 if err := m.checkAndUpdateTTL(); err != nil {
63 logger.Warn("Failed to check/update table TTLs, using existing TTL", "error", err)
64 } else {
65 logger.Info("Table TTLs verified/updated", "retention", config.RetentionPeriod.Duration)
66 }
67
68 m.wg.Add(1)
69 go m.flushLoop()
70 return m, nil
71}
72
73func (m *ClickHouseMetrics) RecordQuery(metric QueryMetric) {
74 m.mu.Lock()
75 defer m.mu.Unlock()
76
77 m.queryBuffer = append(m.queryBuffer, metric)
78 if len(m.queryBuffer) >= m.config.BatchSize {
79 m.flushQueriesLocked()
80 }
81}
82
83func (m *ClickHouseMetrics) flushLoop() {
84 defer m.wg.Done()
85 ticker := time.NewTicker(m.config.FlushInterval.Duration)
86 defer ticker.Stop()
87
88 m.logger.Info("Metrics flush loop started", "interval", m.config.FlushInterval.Duration)
89
90 for {
91 select {
92 case <-ticker.C:
93 m.mu.Lock()
94 m.flushLocked()
95 m.mu.Unlock()
96 case <-m.stopChan:
97 m.logger.Info("Metrics flush loop received stop signal. Flushing remaining data...")
98 m.mu.Lock()
99 m.flushLocked()
100 m.mu.Unlock()
101 m.logger.Info("Metrics flush loop stopped.")
102 return
103 }
104 }
105}
106
107func (m *ClickHouseMetrics) flushLocked() {
108 if len(m.queryBuffer) > 0 {
109 m.flushQueriesLocked()
110 }
111}
112
113func (m *ClickHouseMetrics) checkAndUpdateTTL() error {
114 retentionSeconds := int(m.config.RetentionPeriod.Duration.Seconds())
115 if retentionSeconds <= 0 {
116 m.logger.Warn("Invalid retention period configured, skipping TTL update", "retention", m.config.RetentionPeriod.Duration)
117 return nil
118 }
119
120 m.logger.Info("Checking/Updating table TTLs", "retention_seconds", retentionSeconds)
121
122 updateTableTTL := func(tableName string) error {
123 query := fmt.Sprintf("ALTER TABLE %s MODIFY TTL timestamp + toIntervalSecond(?)", tableName)
124 m.logger.Debug("Executing TTL update query", "query", query)
125 _, err := m.db.Exec(query, retentionSeconds)
126 if err != nil && errors.Is(err, sql.ErrNoRows) {
127 m.logger.Warn("Table not found while updating TTL, likely needs migration", "table", tableName)
128 return nil
129 } else if err != nil {
130 return fmt.Errorf("failed to update %s TTL: %w", tableName, err)
131 }
132 m.logger.Debug("TTL updated successfully", "table", tableName)
133 return nil
134 }
135
136 if err := updateTableTTL("alky_dns_queries"); err != nil {
137 return err
138 }
139
140 return nil
141}
142
143func (m *ClickHouseMetrics) flushQueriesLocked() {
144 if len(m.queryBuffer) == 0 {
145 return
146 }
147
148 m.logger.Debug("Flushing query metrics", "count", len(m.queryBuffer))
149 start := time.Now()
150
151 tx, err := m.db.Begin()
152 if err != nil {
153 m.logger.Error("Failed to begin transaction for query metrics", "error", err)
154 m.queryBuffer = m.queryBuffer[:0]
155 return
156 }
157 defer func() {
158 if err != nil {
159 if rbErr := tx.Rollback(); rbErr != nil {
160 m.logger.Error("Failed to rollback transaction for query metrics", "error", rbErr)
161 }
162 }
163 }()
164
165 stmt, err := tx.Prepare(`
166 INSERT INTO alky_dns_queries (
167 timestamp, instance_id, query_name, query_type, query_class,
168 remote_addr, response_code, duration, cache_hit
169 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
170 `)
171 if err != nil {
172 m.logger.Error("Failed to prepare statement for query metrics", "error", err)
173 m.queryBuffer = m.queryBuffer[:0]
174 return
175 }
176 defer stmt.Close()
177
178 count := 0
179 for _, metric := range m.queryBuffer {
180 _, err = stmt.Exec(
181 metric.Timestamp,
182 metric.InstanceID,
183 metric.QueryName,
184 metric.QueryType,
185 metric.QueryClass,
186 metric.RemoteAddr,
187 metric.ResponseCode,
188 metric.Duration,
189 false,
190 )
191 if err != nil {
192 m.logger.Error("Failed to execute statement for query metric", "error", err, "metric_index", count)
193 return
194 }
195 count++
196 }
197
198 err = tx.Commit()
199 if err != nil {
200 m.logger.Error("Failed to commit transaction for query metrics", "error", err)
201 return
202 }
203
204 m.logger.Debug("Successfully flushed query metrics", "count", count, "duration", time.Since(start))
205 m.queryBuffer = m.queryBuffer[:0]
206}
207
208func (m *ClickHouseMetrics) Close() error {
209 m.logger.Info("Closing metrics client...")
210 close(m.stopChan)
211 m.wg.Wait()
212 m.logger.Info("Flush loop stopped. Closing database connection.")
213 return m.db.Close()
214}