a recursive dns resolver
at main 5.4 kB view raw
1package metrics 2 3import ( 4 "database/sql" 5 "errors" 6 "fmt" 7 "log/slog" 8 "sync" 9 "time" 10 11 _ "github.com/ClickHouse/clickhouse-go/v2" 12 13 "tangled.sh/seiso.moe/alky/pkg/config" 14) 15 16type ClickHouseMetrics struct { 17 db *sql.DB 18 config *config.MetricsConfig 19 queryBuffer []QueryMetric 20 mu sync.Mutex 21 stopChan chan struct{} 22 wg sync.WaitGroup 23 logger *slog.Logger 24} 25 26type QueryMetric struct { 27 Timestamp time.Time 28 InstanceID string 29 QueryName string 30 QueryType string 31 QueryClass string 32 RemoteAddr string 33 ResponseCode string 34 Duration int64 35} 36 37func NewClickHouseMetrics(config *config.MetricsConfig, logger *slog.Logger) (*ClickHouseMetrics, error) { 38 if logger == nil { 39 logger = slog.Default() 40 } 41 logger.Info("Connecting to ClickHouse", "dsn", config.DSN) 42 43 db, err := sql.Open("clickhouse", config.DSN) 44 if err != nil { 45 return nil, fmt.Errorf("failed to initialize ClickHouse driver: %w", err) 46 } 47 48 if err := db.Ping(); err != nil { 49 db.Close() 50 return nil, fmt.Errorf("failed to connect to ClickHouse (%s): %w", config.DSN, err) 51 } 52 logger.Info("Successfully connected to ClickHouse") 53 54 m := &ClickHouseMetrics{ 55 db: db, 56 config: config, 57 queryBuffer: make([]QueryMetric, 0, config.BatchSize), 58 stopChan: make(chan struct{}), 59 logger: logger, 60 } 61 62 if err := m.checkAndUpdateTTL(); err != nil { 63 logger.Warn("Failed to check/update table TTLs, using existing TTL", "error", err) 64 } else { 65 logger.Info("Table TTLs verified/updated", "retention", config.RetentionPeriod.Duration) 66 } 67 68 m.wg.Add(1) 69 go m.flushLoop() 70 return m, nil 71} 72 73func (m *ClickHouseMetrics) RecordQuery(metric QueryMetric) { 74 m.mu.Lock() 75 defer m.mu.Unlock() 76 77 m.queryBuffer = append(m.queryBuffer, metric) 78 if len(m.queryBuffer) >= m.config.BatchSize { 79 m.flushQueriesLocked() 80 } 81} 82 83func (m *ClickHouseMetrics) flushLoop() { 84 defer m.wg.Done() 85 ticker := time.NewTicker(m.config.FlushInterval.Duration) 86 defer ticker.Stop() 87 88 m.logger.Info("Metrics flush loop started", "interval", m.config.FlushInterval.Duration) 89 90 for { 91 select { 92 case <-ticker.C: 93 m.mu.Lock() 94 m.flushLocked() 95 m.mu.Unlock() 96 case <-m.stopChan: 97 m.logger.Info("Metrics flush loop received stop signal. Flushing remaining data...") 98 m.mu.Lock() 99 m.flushLocked() 100 m.mu.Unlock() 101 m.logger.Info("Metrics flush loop stopped.") 102 return 103 } 104 } 105} 106 107func (m *ClickHouseMetrics) flushLocked() { 108 if len(m.queryBuffer) > 0 { 109 m.flushQueriesLocked() 110 } 111} 112 113func (m *ClickHouseMetrics) checkAndUpdateTTL() error { 114 retentionSeconds := int(m.config.RetentionPeriod.Duration.Seconds()) 115 if retentionSeconds <= 0 { 116 m.logger.Warn("Invalid retention period configured, skipping TTL update", "retention", m.config.RetentionPeriod.Duration) 117 return nil 118 } 119 120 m.logger.Info("Checking/Updating table TTLs", "retention_seconds", retentionSeconds) 121 122 updateTableTTL := func(tableName string) error { 123 query := fmt.Sprintf("ALTER TABLE %s MODIFY TTL timestamp + toIntervalSecond(?)", tableName) 124 m.logger.Debug("Executing TTL update query", "query", query) 125 _, err := m.db.Exec(query, retentionSeconds) 126 if err != nil && errors.Is(err, sql.ErrNoRows) { 127 m.logger.Warn("Table not found while updating TTL, likely needs migration", "table", tableName) 128 return nil 129 } else if err != nil { 130 return fmt.Errorf("failed to update %s TTL: %w", tableName, err) 131 } 132 m.logger.Debug("TTL updated successfully", "table", tableName) 133 return nil 134 } 135 136 if err := updateTableTTL("alky_dns_queries"); err != nil { 137 return err 138 } 139 140 return nil 141} 142 143func (m *ClickHouseMetrics) flushQueriesLocked() { 144 if len(m.queryBuffer) == 0 { 145 return 146 } 147 148 m.logger.Debug("Flushing query metrics", "count", len(m.queryBuffer)) 149 start := time.Now() 150 151 tx, err := m.db.Begin() 152 if err != nil { 153 m.logger.Error("Failed to begin transaction for query metrics", "error", err) 154 m.queryBuffer = m.queryBuffer[:0] 155 return 156 } 157 defer func() { 158 if err != nil { 159 if rbErr := tx.Rollback(); rbErr != nil { 160 m.logger.Error("Failed to rollback transaction for query metrics", "error", rbErr) 161 } 162 } 163 }() 164 165 stmt, err := tx.Prepare(` 166 INSERT INTO alky_dns_queries ( 167 timestamp, instance_id, query_name, query_type, query_class, 168 remote_addr, response_code, duration, cache_hit 169 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) 170 `) 171 if err != nil { 172 m.logger.Error("Failed to prepare statement for query metrics", "error", err) 173 m.queryBuffer = m.queryBuffer[:0] 174 return 175 } 176 defer stmt.Close() 177 178 count := 0 179 for _, metric := range m.queryBuffer { 180 _, err = stmt.Exec( 181 metric.Timestamp, 182 metric.InstanceID, 183 metric.QueryName, 184 metric.QueryType, 185 metric.QueryClass, 186 metric.RemoteAddr, 187 metric.ResponseCode, 188 metric.Duration, 189 false, 190 ) 191 if err != nil { 192 m.logger.Error("Failed to execute statement for query metric", "error", err, "metric_index", count) 193 return 194 } 195 count++ 196 } 197 198 err = tx.Commit() 199 if err != nil { 200 m.logger.Error("Failed to commit transaction for query metrics", "error", err) 201 return 202 } 203 204 m.logger.Debug("Successfully flushed query metrics", "count", count, "duration", time.Since(start)) 205 m.queryBuffer = m.queryBuffer[:0] 206} 207 208func (m *ClickHouseMetrics) Close() error { 209 m.logger.Info("Closing metrics client...") 210 close(m.stopChan) 211 m.wg.Wait() 212 m.logger.Info("Flush loop stopped. Closing database connection.") 213 return m.db.Close() 214}