1package metrics
2
3import (
4 "database/sql"
5 "fmt"
6 "log/slog"
7 "sync"
8 "time"
9
10 _ "github.com/ClickHouse/clickhouse-go/v2"
11
12 "code.kiri.systems/kiri/alky/pkg/config"
13 "code.kiri.systems/kiri/alky/pkg/dns"
14)
15
16type ClickHouseMetrics struct {
17 db *sql.DB
18 config *config.MetricsConfig
19 queryBuffer []QueryMetric
20 cacheBuffer []CacheMetric
21 mu sync.Mutex
22 stopChan chan struct{}
23 logger *slog.Logger
24}
25
26type QueryMetric struct {
27 Timestamp time.Time
28 InstanceID string
29 QueryName string
30 QueryType string
31 QueryClass string
32 RemoteAddr string
33 ResponseCode string
34 Duration int64
35 CacheHit bool
36}
37
38type CacheMetric struct {
39 Timestamp time.Time
40 TotalQueries int64
41 CacheHits int64
42 CacheMisses int64
43 NegativeHits int64
44 PositiveHits int64
45 Evictions int64
46 Size int64
47}
48
49func NewClickHouseMetrics(config *config.MetricsConfig, logger *slog.Logger) (*ClickHouseMetrics, error) {
50 db, err := sql.Open("clickhouse", config.DSN)
51 if err != nil {
52 return nil, fmt.Errorf("failed to connect to ClickHouse: %w", err)
53 }
54
55 m := &ClickHouseMetrics{
56 db: db,
57 config: config,
58 queryBuffer: make([]QueryMetric, 0, config.BatchSize),
59 cacheBuffer: make([]CacheMetric, 0, config.BatchSize),
60 stopChan: make(chan struct{}),
61 logger: logger,
62 }
63
64 if err := m.changeTTL(); err != nil {
65 db.Close()
66 return nil, fmt.Errorf("failed to initialize tables: %w", err)
67 }
68
69 go m.flushLoop()
70 return m, nil
71}
72
73func (m *ClickHouseMetrics) RecordQuery(metric QueryMetric) {
74 m.mu.Lock()
75 defer m.mu.Unlock()
76
77 m.queryBuffer = append(m.queryBuffer, metric)
78 if len(m.queryBuffer) >= m.config.BatchSize {
79 m.flush()
80 }
81}
82
83func (m *ClickHouseMetrics) RecordCacheMetrics(metric CacheMetric) {
84 m.mu.Lock()
85 defer m.mu.Unlock()
86
87 m.cacheBuffer = append(m.cacheBuffer, metric)
88 if len(m.cacheBuffer) >= m.config.BatchSize {
89 m.flush()
90 }
91}
92
93func (m *ClickHouseMetrics) RecordCacheStats(stats *dns.CacheStats) {
94 m.RecordCacheMetrics(CacheMetric{
95 Timestamp: time.Now(),
96 TotalQueries: stats.TotalQueries,
97 CacheHits: stats.CacheHits,
98 CacheMisses: stats.CacheMisses,
99 NegativeHits: stats.NegativeHits,
100 PositiveHits: stats.PositiveHits,
101 Evictions: stats.Evictions,
102 Size: stats.Size,
103 })
104}
105
106func (m *ClickHouseMetrics) flushLoop() {
107 ticker := time.NewTicker(m.config.FlushInterval.Duration)
108 defer ticker.Stop()
109
110 for {
111 select {
112 case <-ticker.C:
113 m.mu.Lock()
114 m.flush()
115 m.mu.Unlock()
116 case <-m.stopChan:
117 return
118 }
119 }
120}
121
122func (m *ClickHouseMetrics) flush() {
123 if len(m.queryBuffer) > 0 {
124 if err := m.flushQueries(); err != nil {
125 m.logger.Error("Failed to flush query metrics", "error", err)
126 }
127 m.queryBuffer = m.queryBuffer[:0]
128 }
129
130 if len(m.cacheBuffer) > 0 {
131 if err := m.flushCacheMetrics(); err != nil {
132 m.logger.Error("Failed to flush cache metrics", "error", err)
133 }
134 m.cacheBuffer = m.cacheBuffer[:0]
135 }
136}
137
138func (m *ClickHouseMetrics) changeTTL() error {
139 if _, err := m.db.Exec(
140 "ALTER TABLE alky_dns_queries MODIFY TTL timestamp + toIntervalSecond(?)",
141 int(m.config.RetentionPeriod.Seconds()),
142 ); err != nil {
143 return fmt.Errorf("failed to update alky_dns_queries TTL: %w", err)
144 }
145
146 if _, err := m.db.Exec(
147 "ALTER TABLE alky_dns_cache_metrics MODIFY TTL timestamp + toIntervalSecond(?)",
148 int(m.config.RetentionPeriod.Seconds()),
149 ); err != nil {
150 return fmt.Errorf("failed to update alky_dns_cache_metrics TTL: %w", err)
151 }
152
153 return nil
154}
155
156func (m *ClickHouseMetrics) flushQueries() error {
157 tx, err := m.db.Begin()
158 if err != nil {
159 return err
160 }
161 defer tx.Rollback()
162
163 stmt, err := tx.Prepare(`
164 INSERT INTO alky_dns_queries (
165 timestamp, instance_id, query_name, query_type, query_class,
166 remote_addr, response_code, duration, cache_hit
167 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
168 `)
169 if err != nil {
170 return err
171 }
172 defer stmt.Close()
173
174 for _, metric := range m.queryBuffer {
175 _, err := stmt.Exec(
176 metric.Timestamp,
177 metric.InstanceID,
178 metric.QueryName,
179 metric.QueryType,
180 metric.QueryClass,
181 metric.RemoteAddr,
182 metric.ResponseCode,
183 metric.Duration,
184 metric.CacheHit,
185 )
186 if err != nil {
187 return err
188 }
189 }
190
191 return tx.Commit()
192}
193
194func (m *ClickHouseMetrics) flushCacheMetrics() error {
195 tx, err := m.db.Begin()
196 if err != nil {
197 return err
198 }
199 defer tx.Rollback()
200
201 stmt, err := tx.Prepare(`
202 INSERT INTO alky_dns_cache_metrics (
203 timestamp, instance_id, total_queries, cache_hits, cache_misses,
204 negative_hits, positive_hits, evictions, size
205 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
206 `)
207 if err != nil {
208 return err
209 }
210 defer stmt.Close()
211
212 for _, metric := range m.cacheBuffer {
213 _, err := stmt.Exec(
214 metric.Timestamp,
215 GetInstanceID(),
216 metric.TotalQueries,
217 metric.CacheHits,
218 metric.CacheMisses,
219 metric.NegativeHits,
220 metric.PositiveHits,
221 metric.Evictions,
222 metric.Size,
223 )
224 if err != nil {
225 return err
226 }
227 }
228
229 return tx.Commit()
230}
231
232func (m *ClickHouseMetrics) Close() error {
233 close(m.stopChan)
234 m.mu.Lock()
235 defer m.mu.Unlock()
236 m.flush()
237 return m.db.Close()
238}