a recursive dns resolver

Compare changes

Choose any two refs to compare.

+1 -1
docker-compose.yml
···
soft: 262144
hard: 262144
healthcheck:
-
test: wget --no-verbose --tries=1 --spider http://localhost:8123/ping || exit 1
+
test: curl http://localhost:8123/ping || exit 1
interval: 30s
timeout: 5s
retries: 3
+60
migrations/00002_modified_metrics.sql
···
+
-- +goose Up
+
ALTER TABLE alky_dns_queries
+
MODIFY COLUMN timestamp DateTime CODEC(Delta, ZSTD(1)),
+
MODIFY COLUMN instance_id String CODEC(ZSTD(1)),
+
MODIFY COLUMN query_name String CODEC(ZSTD(1)),
+
MODIFY COLUMN query_type LowCardinality(String) CODEC(ZSTD(1)),
+
MODIFY COLUMN query_class LowCardinality(String) CODEC(ZSTD(1)),
+
MODIFY COLUMN remote_addr String CODEC(ZSTD(1)),
+
MODIFY COLUMN response_code LowCardinality(String) CODEC(ZSTD(1)),
+
MODIFY COLUMN duration Int64 CODEC(T64, ZSTD(1)),
+
MODIFY COLUMN cache_hit Bool CODEC(ZSTD(1));
+
+
ALTER TABLE alky_dns_queries MODIFY TTL timestamp + INTERVAL 30 DAY;
+
+
ALTER TABLE alky_dns_cache_metrics
+
DROP COLUMN IF EXISTS total_queries,
+
MODIFY COLUMN timestamp DateTime CODEC(Delta, ZSTD(1)),
+
MODIFY COLUMN instance_id String CODEC(ZSTD(1)),
+
MODIFY COLUMN cache_hits Int64 CODEC(T64, ZSTD(1)),
+
MODIFY COLUMN cache_misses Int64 CODEC(T64, ZSTD(1)),
+
MODIFY COLUMN negative_hits Int64 CODEC(T64, ZSTD(1)),
+
MODIFY COLUMN positive_hits Int64 CODEC(T64, ZSTD(1)),
+
MODIFY COLUMN evictions Int64 CODEC(T64, ZSTD(1)),
+
MODIFY COLUMN size Int64 CODEC(T64, ZSTD(1));
+
+
ALTER TABLE alky_dns_cache_metrics
+
ADD COLUMN IF NOT EXISTS expired_count Int64 CODEC(T64, ZSTD(1));
+
+
ALTER TABLE alky_dns_cache_metrics MODIFY TTL timestamp + INTERVAL 30 DAY;
+
+
-- +goose Down
+
ALTER TABLE alky_dns_queries
+
MODIFY COLUMN timestamp DateTime,
+
MODIFY COLUMN instance_id String,
+
MODIFY COLUMN query_name String,
+
MODIFY COLUMN query_type String,
+
MODIFY COLUMN query_class String,
+
MODIFY COLUMN remote_addr String,
+
MODIFY COLUMN response_code String,
+
MODIFY COLUMN duration Int64,
+
MODIFY COLUMN cache_hit Bool;
+
+
ALTER TABLE alky_dns_queries MODIFY TTL timestamp + toIntervalDay(30);
+
+
ALTER TABLE alky_dns_cache_metrics
+
ADD COLUMN IF NOT EXISTS total_queries Int64 AFTER instance_id;
+
+
ALTER TABLE alky_dns_cache_metrics
+
MODIFY COLUMN timestamp DateTime,
+
MODIFY COLUMN instance_id String,
+
MODIFY COLUMN cache_hits Int64,
+
MODIFY COLUMN cache_misses Int64,
+
MODIFY COLUMN negative_hits Int64,
+
MODIFY COLUMN positive_hits Int64,
+
MODIFY COLUMN evictions Int64,
+
MODIFY COLUMN size Int;
+
+
ALTER TABLE alky_dns_cache_metrics DROP COLUMN IF EXISTS expired_count;
+
+
ALTER TABLE alky_dns_cache_metrics MODIFY TTL timestamp + toIntervalDay(30);
+2 -97
pkg/metrics/clickhouse.go
···
db *sql.DB
config *config.MetricsConfig
queryBuffer []QueryMetric
-
cacheBuffer []CacheMetric
mu sync.Mutex
stopChan chan struct{}
wg sync.WaitGroup
···
RemoteAddr string
ResponseCode string
Duration int64
-
CacheHit bool
-
}
-
-
type CacheMetric struct {
-
Timestamp time.Time
-
CacheHits int64
-
CacheMisses int64
-
NegativeHits int64
-
PositiveHits int64
-
Evictions int64
-
Size int64
}
func NewClickHouseMetrics(config *config.MetricsConfig, logger *slog.Logger) (*ClickHouseMetrics, error) {
···
db: db,
config: config,
queryBuffer: make([]QueryMetric, 0, config.BatchSize),
-
cacheBuffer: make([]CacheMetric, 0, config.BatchSize),
stopChan: make(chan struct{}),
logger: logger,
}
···
}
}
-
func (m *ClickHouseMetrics) RecordCacheStats(metric CacheMetric) {
-
m.mu.Lock()
-
defer m.mu.Unlock()
-
-
m.cacheBuffer = append(m.cacheBuffer, metric)
-
if len(m.cacheBuffer) >= m.config.BatchSize {
-
m.flushCacheMetricsLocked()
-
}
-
}
-
func (m *ClickHouseMetrics) flushLoop() {
defer m.wg.Done()
ticker := time.NewTicker(m.config.FlushInterval.Duration)
···
if len(m.queryBuffer) > 0 {
m.flushQueriesLocked()
}
-
if len(m.cacheBuffer) > 0 {
-
m.flushCacheMetricsLocked()
-
}
}
func (m *ClickHouseMetrics) checkAndUpdateTTL() error {
···
if err := updateTableTTL("alky_dns_queries"); err != nil {
return err
}
-
if err := updateTableTTL("alky_dns_cache_metrics"); err != nil {
-
return err
-
}
return nil
}
···
INSERT INTO alky_dns_queries (
timestamp, instance_id, query_name, query_type, query_class,
remote_addr, response_code, duration, cache_hit
-
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
+
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
m.logger.Error("Failed to prepare statement for query metrics", "error", err)
···
metric.RemoteAddr,
metric.ResponseCode,
metric.Duration,
-
metric.CacheHit,
+
false,
)
if err != nil {
m.logger.Error("Failed to execute statement for query metric", "error", err, "metric_index", count)
···
m.queryBuffer = m.queryBuffer[:0]
}
-
func (m *ClickHouseMetrics) flushCacheMetricsLocked() {
-
if len(m.cacheBuffer) == 0 {
-
return
-
}
-
-
m.logger.Debug("Flushing cache metrics", "count", len(m.cacheBuffer))
-
start := time.Now()
-
-
tx, err := m.db.Begin()
-
if err != nil {
-
m.logger.Error("Failed to begin transaction for cache metrics", "error", err)
-
m.cacheBuffer = m.cacheBuffer[:0]
-
return
-
}
-
defer func() {
-
if err != nil {
-
if rbErr := tx.Rollback(); rbErr != nil {
-
m.logger.Error("Failed to rollback transaction for cache metrics", "error", rbErr)
-
}
-
}
-
}()
-
-
stmt, err := tx.Prepare(`
-
INSERT INTO alky_dns_cache_metrics (
-
timestamp, instance_id, cache_hits, cache_misses,
-
negative_hits, positive_hits, evictions, size
-
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
-
`)
-
if err != nil {
-
m.logger.Error("Failed to prepare statement for cache metrics", "error", err)
-
m.cacheBuffer = m.cacheBuffer[:0]
-
return
-
}
-
defer stmt.Close()
-
-
count := 0
-
for _, metric := range m.cacheBuffer {
-
instanceID := GetInstanceID()
-
-
_, err = stmt.Exec(
-
metric.Timestamp,
-
instanceID,
-
metric.CacheHits,
-
metric.CacheMisses,
-
metric.NegativeHits,
-
metric.PositiveHits,
-
metric.Evictions,
-
metric.Size,
-
)
-
if err != nil {
-
m.logger.Error("Failed to execute statement for cache metric", "error", err, "metric_index", count)
-
return
-
}
-
count++
-
}
-
-
err = tx.Commit()
-
if err != nil {
-
m.logger.Error("Failed to commit transaction for cache metrics", "error", err)
-
return
-
}
-
-
m.logger.Debug("Successfully flushed cache metrics", "count", count, "duration", time.Since(start))
-
m.cacheBuffer = m.cacheBuffer[:0]
-
}
-
func (m *ClickHouseMetrics) Close() error {
m.logger.Info("Closing metrics client...")
close(m.stopChan)
+4
pkg/rootservers/loader.go
···
}
for _, line := range bytes.Split(data, []byte{'\n'}) {
+
if len(line) == 0 {
+
continue
+
}
+
// skip comments
if line[0] == ';' {
continue
-7
docs/alky.toml
···
# This uses time.ParseDuration semantics
retention_period = "720h"
-
[cache]
-
# The maximum number of items to store in the cache.
-
max_items = 5000
-
-
# How often the cache will evict items.
-
cleanup_interval = "5m"
-
[advanced]
# Timeout (in milliseconds) for outgoing queries before being cancelled.
query_timeout = 100
+1 -1
go.mod
···
github.com/ClickHouse/clickhouse-go/v2 v2.34.0
github.com/stretchr/testify v1.10.0
golang.org/x/time v0.11.0
-
tangled.sh/seiso.moe/magna v0.0.1
+
tangled.sh/seiso.moe/magna v0.0.2
)
require (
+2
go.sum
···
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
tangled.sh/seiso.moe/magna v0.0.1 h1:v8GM2y3xEinc0jGVxYf/33xtWJ74ES9EuTaMxXL8zxo=
tangled.sh/seiso.moe/magna v0.0.1/go.mod h1:bqm+DTo2Pv4ITT0EnR079l++BJgoChBswSB/3KeijUk=
+
tangled.sh/seiso.moe/magna v0.0.2 h1:4VGPlqv/7tVyTtsR4Qkk8ZNypuNbmaeLogWzkpbHrRs=
+
tangled.sh/seiso.moe/magna v0.0.2/go.mod h1:bqm+DTo2Pv4ITT0EnR079l++BJgoChBswSB/3KeijUk=
+7 -7
pkg/dns/ratelimit.go
···
}
type rateLimiter struct {
-
config RateLimitConfig
-
limiters map[string]*ipRateLimiterEntry
-
mu sync.RWMutex
+
config RateLimitConfig
+
limiters map[string]*ipRateLimiterEntry
+
mu sync.RWMutex
stopCleanup chan struct{}
}
···
func newRateLimiter(config RateLimitConfig) *rateLimiter {
rl := &rateLimiter{
-
config: config,
-
limiters: make(map[string]*ipRateLimiterEntry),
+
config: config,
+
limiters: make(map[string]*ipRateLimiterEntry),
stopCleanup: make(chan struct{}),
}
···
}
func (rl *rateLimiter) allow(ip string) bool {
-
rl.mu.Lock()
+
rl.mu.Lock()
defer rl.mu.Unlock()
entry, exists := rl.limiters[ip]
···
if !exists {
limiter := rate.NewLimiter(rate.Limit(rl.config.Rate), rl.config.Burst)
entry := &ipRateLimiterEntry{
-
limiter: limiter,
+
limiter: limiter,
lastAccess: now,
}
+2 -2
pkg/dns/server.go
···
)
var (
-
serverUDPBufferPool = sync.Pool {
+
serverUDPBufferPool = sync.Pool{
New: func() any {
b := make([]byte, maxUDPBufferSize)
return &b
},
}
-
resolverUDPBufferPool = sync.Pool {
+
resolverUDPBufferPool = sync.Pool{
New: func() any {
b := make([]byte, maxResolverUDPBufferSize)
return &b
+2 -4
pkg/metrics/middleware.go
···
hostname = "unknown-host"
}
-
pid := os.Getpid()
-
if version != "" {
-
instanceID = fmt.Sprintf("%s-%d-%s", hostname, pid, version)
+
instanceID = fmt.Sprintf("%s-%s", hostname, version)
} else {
-
instanceID = fmt.Sprintf("%s-%d-dev", hostname, pid)
+
instanceID = fmt.Sprintf("%s-dev", hostname)
}
}