a recursive dns resolver
1package metrics 2 3import ( 4 "database/sql" 5 "fmt" 6 "log/slog" 7 "sync" 8 "time" 9 10 _ "github.com/ClickHouse/clickhouse-go/v2" 11 12 "code.kiri.systems/kiri/alky/pkg/config" 13 "code.kiri.systems/kiri/alky/pkg/dns" 14) 15 16type ClickHouseMetrics struct { 17 db *sql.DB 18 config *config.MetricsConfig 19 queryBuffer []QueryMetric 20 cacheBuffer []CacheMetric 21 mu sync.Mutex 22 stopChan chan struct{} 23 logger *slog.Logger 24} 25 26type QueryMetric struct { 27 Timestamp time.Time 28 InstanceID string 29 QueryName string 30 QueryType string 31 QueryClass string 32 RemoteAddr string 33 ResponseCode string 34 Duration int64 35 CacheHit bool 36} 37 38type CacheMetric struct { 39 Timestamp time.Time 40 TotalQueries int64 41 CacheHits int64 42 CacheMisses int64 43 NegativeHits int64 44 PositiveHits int64 45 Evictions int64 46 Size int64 47} 48 49func NewClickHouseMetrics(config *config.MetricsConfig, logger *slog.Logger) (*ClickHouseMetrics, error) { 50 db, err := sql.Open("clickhouse", config.DSN) 51 if err != nil { 52 return nil, fmt.Errorf("failed to connect to ClickHouse: %w", err) 53 } 54 55 m := &ClickHouseMetrics{ 56 db: db, 57 config: config, 58 queryBuffer: make([]QueryMetric, 0, config.BatchSize), 59 cacheBuffer: make([]CacheMetric, 0, config.BatchSize), 60 stopChan: make(chan struct{}), 61 logger: logger, 62 } 63 64 if err := m.changeTTL(); err != nil { 65 db.Close() 66 return nil, fmt.Errorf("failed to initialize tables: %w", err) 67 } 68 69 go m.flushLoop() 70 return m, nil 71} 72 73func (m *ClickHouseMetrics) RecordQuery(metric QueryMetric) { 74 m.mu.Lock() 75 defer m.mu.Unlock() 76 77 m.queryBuffer = append(m.queryBuffer, metric) 78 if len(m.queryBuffer) >= m.config.BatchSize { 79 m.flush() 80 } 81} 82 83func (m *ClickHouseMetrics) RecordCacheMetrics(metric CacheMetric) { 84 m.mu.Lock() 85 defer m.mu.Unlock() 86 87 m.cacheBuffer = append(m.cacheBuffer, metric) 88 if len(m.cacheBuffer) >= m.config.BatchSize { 89 m.flush() 90 } 91} 92 93func (m *ClickHouseMetrics) RecordCacheStats(stats *dns.CacheStats) { 94 m.RecordCacheMetrics(CacheMetric{ 95 Timestamp: time.Now(), 96 TotalQueries: stats.TotalQueries, 97 CacheHits: stats.CacheHits, 98 CacheMisses: stats.CacheMisses, 99 NegativeHits: stats.NegativeHits, 100 PositiveHits: stats.PositiveHits, 101 Evictions: stats.Evictions, 102 Size: stats.Size, 103 }) 104} 105 106func (m *ClickHouseMetrics) flushLoop() { 107 ticker := time.NewTicker(m.config.FlushInterval.Duration) 108 defer ticker.Stop() 109 110 for { 111 select { 112 case <-ticker.C: 113 m.mu.Lock() 114 m.flush() 115 m.mu.Unlock() 116 case <-m.stopChan: 117 return 118 } 119 } 120} 121 122func (m *ClickHouseMetrics) flush() { 123 if len(m.queryBuffer) > 0 { 124 if err := m.flushQueries(); err != nil { 125 m.logger.Error("Failed to flush query metrics", "error", err) 126 } 127 m.queryBuffer = m.queryBuffer[:0] 128 } 129 130 if len(m.cacheBuffer) > 0 { 131 if err := m.flushCacheMetrics(); err != nil { 132 m.logger.Error("Failed to flush cache metrics", "error", err) 133 } 134 m.cacheBuffer = m.cacheBuffer[:0] 135 } 136} 137 138func (m *ClickHouseMetrics) changeTTL() error { 139 if _, err := m.db.Exec( 140 "ALTER TABLE alky_dns_queries MODIFY TTL timestamp + toIntervalSecond(?)", 141 int(m.config.RetentionPeriod.Seconds()), 142 ); err != nil { 143 return fmt.Errorf("failed to update alky_dns_queries TTL: %w", err) 144 } 145 146 if _, err := m.db.Exec( 147 "ALTER TABLE alky_dns_cache_metrics MODIFY TTL timestamp + toIntervalSecond(?)", 148 int(m.config.RetentionPeriod.Seconds()), 149 ); err != nil { 150 return fmt.Errorf("failed to update alky_dns_cache_metrics TTL: %w", err) 151 } 152 153 return nil 154} 155 156func (m *ClickHouseMetrics) flushQueries() error { 157 tx, err := m.db.Begin() 158 if err != nil { 159 return err 160 } 161 defer tx.Rollback() 162 163 stmt, err := tx.Prepare(` 164 INSERT INTO alky_dns_queries ( 165 timestamp, instance_id, query_name, query_type, query_class, 166 remote_addr, response_code, duration, cache_hit 167 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 168 `) 169 if err != nil { 170 return err 171 } 172 defer stmt.Close() 173 174 for _, metric := range m.queryBuffer { 175 _, err := stmt.Exec( 176 metric.Timestamp, 177 metric.InstanceID, 178 metric.QueryName, 179 metric.QueryType, 180 metric.QueryClass, 181 metric.RemoteAddr, 182 metric.ResponseCode, 183 metric.Duration, 184 metric.CacheHit, 185 ) 186 if err != nil { 187 return err 188 } 189 } 190 191 return tx.Commit() 192} 193 194func (m *ClickHouseMetrics) flushCacheMetrics() error { 195 tx, err := m.db.Begin() 196 if err != nil { 197 return err 198 } 199 defer tx.Rollback() 200 201 stmt, err := tx.Prepare(` 202 INSERT INTO alky_dns_cache_metrics ( 203 timestamp, instance_id, total_queries, cache_hits, cache_misses, 204 negative_hits, positive_hits, evictions, size 205 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 206 `) 207 if err != nil { 208 return err 209 } 210 defer stmt.Close() 211 212 for _, metric := range m.cacheBuffer { 213 _, err := stmt.Exec( 214 metric.Timestamp, 215 GetInstanceID(), 216 metric.TotalQueries, 217 metric.CacheHits, 218 metric.CacheMisses, 219 metric.NegativeHits, 220 metric.PositiveHits, 221 metric.Evictions, 222 metric.Size, 223 ) 224 if err != nil { 225 return err 226 } 227 } 228 229 return tx.Commit() 230} 231 232func (m *ClickHouseMetrics) Close() error { 233 close(m.stopChan) 234 m.mu.Lock() 235 defer m.mu.Unlock() 236 m.flush() 237 return m.db.Close() 238}