a recursive dns resolver

initial commit for metrics

+3
.env
···
+
GOOSE_DRIVER="clickhouse"
+
GOOSE_MIGRATION_DIR="./migrations/"
+
GOOSE_DBSTRING="clickhouse://default:clickhouse@localhost:9000/default?dial_timeout=10s&compress=true"
+17
Justfile
···
+
init:
+
goose status
+
+
format:
+
go fmt ./...
+
fd .go . | xargs gofumpt -l -w
+
go mod tidy
+
+
version:
+
#!/usr/bin/env sh
+
VERSION=$(git describe --tags 2>/dev/null || git rev-parse --short HEAD)
+
echo "$VERSION"
+
+
build: format
+
#!/usr/bin/env sh
+
VERSION=$(just version)
+
go build -ldflags "-X code.kiri.systems/kiri/alky/pkg/metrics.version=$VERSION" .
+25
docker-compose.yml
···
+
services:
+
clickhouse:
+
image: clickhouse/clickhouse-server:latest
+
container_name: clickhouse
+
network_mode: host
+
environment:
+
- CLICKHOUSE_DB=default
+
- CLICKHOUSE_USER=default
+
- CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1
+
- CLICKHOUSE_PASSWORD=clickhouse
+
ports:
+
- "8123:8123"
+
- "9000:9000"
+
- "9009:9009"
+
ulimits:
+
nofile:
+
soft: 262144
+
hard: 262144
+
healthcheck:
+
test: wget --no-verbose --tries=1 --spider http://localhost:8123/ping || exit 1
+
interval: 30s
+
timeout: 5s
+
retries: 3
+
start_period: 30s
+
restart: unless-stopped
+14
docs/alky.toml
···
# Type: Integer
expiration_time = 300
+
[metrics]
+
# ClickHouse connection string
+
dsn = "clickhouse://default:clickhouse@localhost:9000/default"
+
+
# Number of metricsto buffer before sending to ClickHouse
+
batch_size = 1000
+
+
# How often to flush metrics to ClickHouse
+
flush_interval = "10s"
+
+
# How long to retain metrics data
+
# This uses time.ParseDuration semantics
+
retention_period = "720h"
+
[advanced]
# Timeout (in milliseconds) for outgoing queries before being cancelled.
query_timeout = 100
+20 -1
go.mod
···
go 1.22.5
require (
-
code.kiri.systems/kiri/magna v0.0.0-20240922043826-2c2a1c508469
+
code.kiri.systems/kiri/magna v0.0.0-20250211050847-abb0522bcd30
github.com/BurntSushi/toml v1.4.0
+
github.com/ClickHouse/clickhouse-go/v2 v2.31.0
+
)
+
+
require (
+
github.com/ClickHouse/ch-go v0.64.1 // indirect
+
github.com/andybalholm/brotli v1.1.1 // indirect
+
github.com/go-faster/city v1.0.1 // indirect
+
github.com/go-faster/errors v0.7.1 // indirect
+
github.com/google/uuid v1.6.0 // indirect
+
github.com/klauspost/compress v1.17.11 // indirect
+
github.com/paulmach/orb v0.11.1 // indirect
+
github.com/pierrec/lz4/v4 v4.1.22 // indirect
+
github.com/pkg/errors v0.9.1 // indirect
+
github.com/segmentio/asm v1.2.0 // indirect
+
github.com/shopspring/decimal v1.4.0 // indirect
+
go.opentelemetry.io/otel v1.34.0 // indirect
+
go.opentelemetry.io/otel/trace v1.34.0 // indirect
+
golang.org/x/sys v0.30.0 // indirect
+
gopkg.in/yaml.v3 v3.0.1 // indirect
)
+105 -6
go.sum
···
-
code.kiri.systems/kiri/magna v0.0.0-20240721214902-8d0a079dbd84 h1:igzBX4k3REg0WZExjGLWW7/wu/X+U6QlbMc8aeO2030=
-
code.kiri.systems/kiri/magna v0.0.0-20240721214902-8d0a079dbd84/go.mod h1:gSzCiTKyKlUEjGgl/qTb8rxF0QUVuWOEORAsTXA0qyI=
-
code.kiri.systems/kiri/magna v0.0.0-20240922043826-2c2a1c508469 h1:LUvvGcJ7DuW3eo7yblNH2igCJzYsbWJQ08iZEXBWplc=
-
code.kiri.systems/kiri/magna v0.0.0-20240922043826-2c2a1c508469/go.mod h1:gSzCiTKyKlUEjGgl/qTb8rxF0QUVuWOEORAsTXA0qyI=
+
code.kiri.systems/kiri/magna v0.0.0-20250211050847-abb0522bcd30 h1:ORu6TXli7rdqczAOE3Mi+Xc4IlzcgEpNXjUWeNLoqxg=
+
code.kiri.systems/kiri/magna v0.0.0-20250211050847-abb0522bcd30/go.mod h1:gSzCiTKyKlUEjGgl/qTb8rxF0QUVuWOEORAsTXA0qyI=
github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0=
github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
+
github.com/ClickHouse/ch-go v0.64.1 h1:FWpP+QU4KchgzpEekuv8YoI/fUc4H2r6Bwc5WwrzvcI=
+
github.com/ClickHouse/ch-go v0.64.1/go.mod h1:RBUynvczWwVzhS6Up9lPKlH1mrk4UAmle6uzCiW4Pkc=
+
github.com/ClickHouse/clickhouse-go/v2 v2.31.0 h1:9MNHRDYXjFTJizGEJM1DfYAqdra/ohprPoZ+LPiuHXQ=
+
github.com/ClickHouse/clickhouse-go/v2 v2.31.0/go.mod h1:V1aZaG0ctMbd8KVi+D4loXi97duWYtHiQHMCgipKJcI=
+
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
+
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+
github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw=
+
github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw=
+
github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg=
+
github.com/go-faster/errors v0.7.1/go.mod h1:5ySTjWFiphBs07IKuiL69nxdfd5+fzh1u7FPGZP2quo=
+
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
+
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
+
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
+
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
+
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
+
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
+
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
+
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
+
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
+
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
+
github.com/paulmach/orb v0.11.1 h1:3koVegMC4X/WeiXYz9iswopaTwMem53NzTJuTF20JzU=
+
github.com/paulmach/orb v0.11.1/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU=
+
github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY=
+
github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU=
+
github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
+
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
-
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
+
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
+
github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
+
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
+
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
+
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
+
github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
+
github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
+
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
+
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
+
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
+
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+
go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g=
+
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
+
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
+
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
+
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
+
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
+
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
+
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
+
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
+
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+53 -21
main.go
···
"code.kiri.systems/kiri/alky/pkg/config"
"code.kiri.systems/kiri/alky/pkg/dns"
+
"code.kiri.systems/kiri/alky/pkg/metrics"
"code.kiri.systems/kiri/alky/pkg/rootservers"
)
···
log.Fatal(err)
}
-
rootServers, err := rootservers.DecodeRootHints(cfg.Server.RootHintsFile)
+
logger := setupLogger(&cfg)
+
+
metricsClient, err := metrics.NewClickHouseMetrics(&cfg.Metrics, logger)
if err != nil {
log.Fatal(err)
}
-
-
var logger *slog.Logger
-
switch cfg.Logging.Output {
-
case "file":
-
f, err := os.OpenFile(cfg.Logging.FilePath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0o644)
-
if err != nil {
-
log.Fatal(err)
-
}
+
defer metricsClient.Close()
-
logger = slog.New(slog.NewJSONHandler(f, nil))
-
case "stdout":
-
fallthrough
-
default:
-
logger = slog.New(slog.NewJSONHandler(os.Stdout, nil))
+
rootServers, err := rootservers.DecodeRootHints(cfg.Server.RootHintsFile)
+
if err != nil {
+
log.Fatal(err)
}
-
memCache := dns.NewMemoryCache()
-
var cache dns.Cache = memCache
+
cache := dns.NewMemoryCache(5000, 5*time.Minute)
handler := &dns.QueryHandler{
RootServers: rootServers,
Timeout: time.Duration(cfg.Advanced.QueryTimeout) * time.Second,
-
Cache: &cache,
+
Cache: cache,
+
Logger: logger,
}
-
logConfig := &dns.LogConfig{Logger: logger}
+
go monitorCacheMetrics(cache, metricsClient, logger)
rateLimitHandler := dns.RateLimitMiddleware(&dns.RateLimitConfig{
Rate: float64(cfg.Ratelimit.Rate),
···
WindowLength: time.Duration(cfg.Ratelimit.Window) * time.Second,
ExpirationTime: time.Duration(cfg.Ratelimit.ExpirationTime) * time.Second,
})(handler)
-
loggingHandler := dns.LoggingMiddleware(logConfig)(rateLimitHandler)
+
+
metricsHandler := metrics.MetricsMiddleware(metricsClient)(rateLimitHandler)
+
+
loggingHandler := dns.LoggingMiddleware(&dns.LogConfig{
+
Logger: logger,
+
Level: slog.LevelInfo,
+
})(metricsHandler)
s := dns.Server{
Address: cfg.Server.Address,
···
UDPSize: 512,
ReadTimeout: 2 * time.Second,
WriteTimeout: 2 * time.Second,
-
-
Logger: logger,
+
Logger: logger,
}
if err := s.ListenAndServe(); err != nil {
slog.Error("Failed to start server", "error", err)
}
}
+
+
func monitorCacheMetrics(cache *dns.MemoryCache, metricsClient *metrics.ClickHouseMetrics, logger *slog.Logger) {
+
ticker := time.NewTicker(1 * time.Minute)
+
defer ticker.Stop()
+
+
for range ticker.C {
+
stats := cache.GetStats()
+
metricsClient.RecordCacheStats(stats)
+
logger.Info("Cache metrics recorded to ClickHouse")
+
}
+
}
+
+
func setupLogger(cfg *config.Config) *slog.Logger {
+
var logger *slog.Logger
+
+
handlerOpts := &slog.HandlerOptions{
+
Level: slog.LevelDebug,
+
}
+
+
switch cfg.Logging.Output {
+
case "file":
+
f, err := os.OpenFile(cfg.Logging.FilePath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0o644)
+
if err != nil {
+
log.Fatal(err)
+
}
+
+
logger = slog.New(slog.NewJSONHandler(f, handlerOpts))
+
default:
+
logger = slog.New(slog.NewJSONHandler(os.Stdout, handlerOpts))
+
}
+
+
return logger
+
}
+34
migrations/00001_initial.sql
···
+
-- +goose Up
+
CREATE TABLE IF NOT EXISTS alky_dns_queries (
+
timestamp DateTime,
+
instance_id String,
+
query_name String,
+
query_type String,
+
query_class String,
+
remote_addr String,
+
response_code String,
+
duration Int64,
+
cache_hit Bool
+
) ENGINE = MergeTree()
+
PARTITION BY toYYYYMM(timestamp)
+
ORDER BY (timestamp, instance_id, query_name)
+
TTL timestamp + toIntervalDay(30);
+
+
CREATE TABLE IF NOT EXISTS alky_dns_cache_metrics (
+
timestamp DateTime,
+
instance_id String,
+
total_queries Int64,
+
cache_hits Int64,
+
cache_misses Int64,
+
negative_hits Int64,
+
positive_hits Int64,
+
evictions Int64,
+
size Int,
+
) ENGINE = MergeTree()
+
PARTITION BY toYYYYMM(timestamp)
+
ORDER BY (timestamp, instance_id)
+
TTL timestamp + toIntervalDay(30);
+
+
-- +goose Down
+
DROP TABLE IF EXISTS alky_dns_queries;
+
DROP TABLE IF EXISTS alky_dns_cache_metrics;
+35
pkg/config/config.go
···
import (
"fmt"
+
"time"
"github.com/BurntSushi/toml"
)
···
ExpirationTime int `toml:"expiration_time"`
}
+
type MetricsConfig struct {
+
DSN string `toml:"dsn"`
+
BatchSize int `toml:"batch_size"`
+
FlushInterval duration `toml:"flush_interval"`
+
RetentionPeriod duration `toml:"retention_period"`
+
}
+
type AdvancedConfig struct {
QueryTimeout int `toml:"query_timeout"`
}
···
Server ServerConfig `toml:"server"`
Logging LoggingConfig `toml:"logging"`
Ratelimit RatelimitConfig `toml:"ratelimit"`
+
Metrics MetricsConfig `toml:"metrics"`
Advanced AdvancedConfig `toml:"advanced"`
}
+
type duration struct {
+
time.Duration
+
}
+
+
func (d *duration) UnmarshalText(text []byte) error {
+
var err error
+
d.Duration, err = time.ParseDuration(string(text))
+
return err
+
}
+
func LoadConfig(path string) (Config, error) {
cfg := Config{}
if _, err := toml.DecodeFile(path, &cfg); err != nil {
···
if cfg.Logging.Output == "file" && cfg.Logging.FilePath == "" {
return cfg, fmt.Errorf("If `[logging.output]` is `file` then `file_path` must be set.")
+
}
+
+
if cfg.Metrics.DSN == "" {
+
cfg.Metrics.DSN = "clickhouse://localhost:9000/default"
+
}
+
+
if cfg.Metrics.BatchSize == 0 {
+
cfg.Metrics.BatchSize = 1000
+
}
+
+
if cfg.Metrics.FlushInterval.Duration == 0 {
+
cfg.Metrics.FlushInterval.Duration = 10 * time.Second
+
}
+
+
if cfg.Metrics.RetentionPeriod.Duration == 0 {
+
cfg.Metrics.RetentionPeriod.Duration = 30 * 24 * time.Hour
}
if cfg.Advanced.QueryTimeout == 0 {
+261 -12
pkg/dns/cache.go
···
package dns
import (
+
"log"
+
"strings"
"sync"
"time"
"code.kiri.systems/kiri/magna"
)
+
type BailiwickRule int
+
+
const (
+
BailiwickSame BailiwickRule = iota
+
BailiwickChild
+
BailiwickOutside
+
)
+
+
type CacheEntry struct {
+
Answer []CachedResourceRecord
+
Authority []CachedResourceRecord
+
Additional []CachedResourceRecord
+
NegativeTTL time.Duration
+
ExpireAt time.Time
+
CreateTime time.Time
+
IsNegative bool
+
Bailiwick string
+
}
+
type CachedResourceRecord struct {
-
Record magna.ResourceRecord
-
ExpireAt time.Time
+
Record magna.ResourceRecord
+
ExpireAt time.Time
+
BailiwickRule BailiwickRule
+
}
+
+
type MemoryCache struct {
+
entries map[string]*CacheEntry
+
mu sync.RWMutex
+
stats CacheStats
+
maxSize int
+
cleaupInterval time.Duration
}
-
type CacheEntry struct {
-
Answer []CachedResourceRecord
+
type CacheStats struct {
+
TotalQueries int64
+
CacheHits int64
+
CacheMisses int64
+
NegativeHits int64
+
PositiveHits int64
+
Evictions int64
+
Size int
}
type Cache interface {
Get(key string) (*CacheEntry, bool)
Set(key string, entry *CacheEntry)
-
}
-
-
type MemoryCache struct {
-
entries map[string]*CacheEntry
-
mu sync.RWMutex
+
Clear()
+
GetStats() *CacheStats
}
-
func NewMemoryCache() *MemoryCache {
-
return &MemoryCache{
-
entries: make(map[string]*CacheEntry),
+
func NewMemoryCache(maxSize int, cleanupInterval time.Duration) *MemoryCache {
+
cache := &MemoryCache{
+
entries: make(map[string]*CacheEntry),
+
maxSize: maxSize,
+
cleaupInterval: cleanupInterval,
}
+
+
go cache.periodicCleanup()
+
return cache
}
func (c *MemoryCache) Get(key string) (*CacheEntry, bool) {
c.mu.RLock()
c.mu.RUnlock()
+
c.stats.TotalQueries++
+
+
log.Println("looking for: ", key)
+
for k := range c.entries {
+
log.Printf("\t%s\n", k)
+
}
+
+
log.Println(c.entries[key])
+
entry, exists := c.entries[key]
if !exists {
+
log.Println("cache miss")
+
c.stats.CacheMisses++
return nil, false
}
+
+
if time.Now().After(entry.ExpireAt) {
+
c.stats.CacheMisses++
+
log.Println("cache expire")
+
return nil, false
+
}
+
+
if entry.IsNegative {
+
c.stats.NegativeHits++
+
} else {
+
c.stats.PositiveHits++
+
}
+
c.stats.CacheHits++
return entry, true
}
···
c.mu.Lock()
defer c.mu.Unlock()
+
if len(c.entries) >= c.maxSize {
+
c.evictOldest()
+
}
+
c.entries[key] = entry
+
c.stats.Size = len(c.entries)
+
}
+
+
func (c *MemoryCache) Clear() {
+
c.mu.Lock()
+
defer c.mu.Unlock()
+
+
c.entries = make(map[string]*CacheEntry)
+
c.stats = CacheStats{}
+
}
+
+
func (c *MemoryCache) GetStats() *CacheStats {
+
c.mu.RLock()
+
defer c.mu.RUnlock()
+
+
stats := c.stats
+
return &stats
+
}
+
+
func (c *MemoryCache) evictOldest() {
+
var oldestKey string
+
var oldestTime time.Time
+
+
first := true
+
for k, e := range c.entries {
+
if first || e.CreateTime.Before(oldestTime) {
+
oldestKey = k
+
oldestTime = e.CreateTime
+
first = false
+
}
+
}
+
+
if oldestKey != "" {
+
delete(c.entries, oldestKey)
+
c.stats.Evictions++
+
}
+
}
+
+
func (c *MemoryCache) periodicCleanup() {
+
ticker := time.NewTicker(c.cleaupInterval)
+
defer ticker.Stop()
+
+
for range ticker.C {
+
c.cleanup()
+
}
+
}
+
+
func (c *MemoryCache) cleanup() {
+
c.mu.Lock()
+
defer c.mu.Unlock()
+
+
now := time.Now()
+
for k, e := range c.entries {
+
if now.After(e.ExpireAt) {
+
delete(c.entries, k)
+
}
+
}
+
+
c.stats.Size = len(c.entries)
+
}
+
+
func getMinTTL(records []magna.ResourceRecord) uint32 {
+
if len(records) == 0 {
+
return 0
+
}
+
+
minTTL := records[0].TTL
+
for _, rr := range records[1:] {
+
if rr.TTL < minTTL {
+
minTTL = rr.TTL
+
}
+
}
+
return minTTL
+
}
+
+
func CreateCacheEntry(msg *magna.Message, zone string) *CacheEntry {
+
now := time.Now()
+
entry := &CacheEntry{
+
CreateTime: now,
+
IsNegative: msg.Header.RCode == magna.NXDOMAIN,
+
Bailiwick: zone,
+
}
+
+
if entry.IsNegative {
+
var soaTTL uint32
+
for _, auth := range msg.Authority {
+
if auth.RType == magna.SOAType {
+
soaTTL = auth.TTL
+
break
+
}
+
}
+
+
if soaTTL == 0 {
+
soaTTL = 900
+
}
+
+
entry.NegativeTTL = time.Duration(soaTTL) * time.Second
+
entry.ExpireAt = now.Add(entry.NegativeTTL)
+
entry.Authority = make([]CachedResourceRecord, len(msg.Authority))
+
for i, rr := range msg.Authority {
+
rule := determineBailiwickRule(zone, rr.Name)
+
entry.Authority[i] = CachedResourceRecord{
+
Record: rr,
+
ExpireAt: now.Add(time.Duration(rr.TTL) * time.Second),
+
BailiwickRule: rule,
+
}
+
}
+
+
return entry
+
}
+
+
minTTL := getMinTTL(msg.Answer)
+
entry.ExpireAt = now.Add(time.Duration(minTTL) * time.Second)
+
+
entry.Answer = make([]CachedResourceRecord, len(msg.Answer))
+
for i, rr := range msg.Answer {
+
rule := determineBailiwickRule(zone, rr.Name)
+
entry.Answer[i] = CachedResourceRecord{
+
Record: rr,
+
ExpireAt: now.Add(time.Duration(rr.TTL) * time.Second),
+
BailiwickRule: rule,
+
}
+
}
+
+
entry.Authority = make([]CachedResourceRecord, len(msg.Authority))
+
for i, rr := range msg.Authority {
+
rule := determineBailiwickRule(zone, rr.Name)
+
entry.Authority[i] = CachedResourceRecord{
+
Record: rr,
+
ExpireAt: now.Add(time.Duration(rr.TTL) * time.Second),
+
BailiwickRule: rule,
+
}
+
}
+
+
entry.Additional = make([]CachedResourceRecord, len(msg.Additional))
+
for i, rr := range msg.Additional {
+
rule := determineBailiwickRule(zone, rr.Name)
+
entry.Additional[i] = CachedResourceRecord{
+
Record: rr,
+
ExpireAt: now.Add(time.Duration(rr.TTL) * time.Second),
+
BailiwickRule: rule,
+
}
+
}
+
+
return entry
+
}
+
+
func isSubdomainOf(a, b string) bool {
+
if a == b {
+
return true
+
}
+
+
return strings.HasSuffix(b, "."+a)
+
}
+
+
func determineBailiwickRule(zone, name string) BailiwickRule {
+
if zone == name {
+
return BailiwickSame
+
}
+
+
if isSubdomainOf(zone, name) {
+
return BailiwickChild
+
}
+
+
return BailiwickOutside
+
}
+
+
func extractZone(msg *magna.Message) string {
+
for _, auth := range msg.Authority {
+
if auth.RType == magna.SOAType {
+
return auth.Name
+
}
+
+
if auth.RType == magna.NSType {
+
return auth.Name
+
}
+
}
+
+
if len(msg.Question) > 0 {
+
return msg.Question[0].QName
+
}
+
+
return ""
}
+167 -110
pkg/dns/resolve.go
···
import (
"context"
"fmt"
+
"log"
+
"log/slog"
"net"
-
"strings"
"time"
"code.kiri.systems/kiri/magna"
···
type QueryHandler struct {
RootServers []string
Timeout time.Duration
-
Cache *Cache
+
Cache *MemoryCache
+
Logger *slog.Logger
}
type queryResponse struct {
···
}
func (h *QueryHandler) ServeDNS(w ResponseWriter, r *Request) {
-
msg := h.processQuery(r.Message.Encode())
-
w.WriteMsg(msg)
-
}
-
-
func (h *QueryHandler) processQuery(messageBuffer []byte) *magna.Message {
-
var query magna.Message
-
if err := query.Decode(messageBuffer); err != nil {
-
return nil
+
if len(r.Message.Question) < 1 {
+
h.Logger.Debug("received query with no questions")
+
msg := r.Message.CreateReply(r.Message)
+
msg = msg.SetRCode(magna.FORMERR)
+
w.WriteMsg(msg)
+
return
}
-
msg := new(magna.Message)
-
msg = msg.CreateReply(&query)
+
question := r.Message.Question[0]
+
answers, authority, err := h.resolveQuestion(r.Context, question, h.RootServers)
-
if len(query.Question) < 1 {
-
return msg.SetRCode(magna.FORMERR)
-
}
-
-
question := query.Question[0]
-
msg = msg.AddQuestion(question)
-
-
if question.QClass != magna.IN {
-
return msg.SetRCode(magna.NOTIMP)
-
}
+
msg := r.Message.CreateReply(r.Message)
+
msg.Header.RA = true
-
answer, err := h.resolveWithCache(question)
if err != nil {
-
return msg.SetRCode(magna.SERVFAIL)
+
if err.Error() == "nxdomain" {
+
msg = msg.SetRCode(magna.NXDOMAIN)
+
msg.Authority = authority
+
msg.Header.NSCount = uint16(len(authority))
+
} else {
+
msg = msg.SetRCode(magna.SERVFAIL)
+
}
+
} else {
+
msg.Answer = answers
+
msg.Header.ANCount = uint16(len(answers))
+
msg = msg.SetRCode(magna.NOERROR)
}
-
if len(answer) == 0 {
-
return msg.SetRCode(magna.NXDOMAIN)
-
}
+
w.WriteMsg(msg)
+
}
-
msg.Header.ANCount = uint16(len(answer))
-
msg.Answer = answer
-
return msg.SetRCode(magna.NOERROR)
+
func (h *QueryHandler) resolveQuestion(ctx context.Context, question magna.Question, servers []string) ([]magna.ResourceRecord, []magna.ResourceRecord, error) {
+
return h.resolveQuestionWithZone(ctx, question, servers, ".")
}
-
func (h *QueryHandler) resolveWithCache(question magna.Question) ([]magna.ResourceRecord, error) {
-
cacheKey := fmt.Sprintf("%s:%s:%s", strings.ToLower(question.QName), question.QType.String(), question.QClass.String())
+
func (h *QueryHandler) resolveQuestionWithZone(ctx context.Context, question magna.Question, servers []string, currentZone string) ([]magna.ResourceRecord, []magna.ResourceRecord, error) {
+
cacheKey := fmt.Sprintf("%s:%s:%s", question.QName, question.QType.String(), question.QClass.String())
-
if e, found := (*h.Cache).Get(cacheKey); found {
-
now := time.Now()
-
var updatedAnswer []magna.ResourceRecord
-
var cname *magna.ResourceRecord
-
hasAddressRecord := false
+
if h.Cache != nil {
+
log.Println("cache is set")
+
if entry, hit := h.Cache.Get(cacheKey); hit {
+
log.Println("womp womp here here", entry, hit)
+
log.Println("request context: ", getCurrentRequest(ctx))
+
now := time.Now()
-
for _, cachedRR := range e.Answer {
-
if now.Before(cachedRR.ExpireAt) {
-
updatedRR := cachedRR.Record
-
updatedRR.TTL = uint32(cachedRR.ExpireAt.Sub(now).Seconds())
-
updatedAnswer = append(updatedAnswer, updatedRR)
+
if r := getCurrentRequest(ctx); r != nil {
+
r.Context = setCacheHit(r.Context)
+
}
-
if updatedRR.RType == magna.CNAMEType && cname == nil {
-
cname = &updatedRR
-
} else if updatedRR.RType == question.QType {
-
hasAddressRecord = true
-
}
+
if entry.IsNegative {
+
return nil, convertCachedToMagna(entry.Authority, now), fmt.Errorf("nxdomain")
}
-
}
-
if len(updatedAnswer) > 0 {
-
// add AAAA types when magna supports those record types
-
if cname != nil && !hasAddressRecord && (question.QType == magna.AType) {
-
cnameTarget := cname.RData.String()
-
aRecords, err := h.resolveWithCache(magna.Question{QName: cnameTarget, QType: question.QType, QClass: question.QClass})
-
if err == nil && len(aRecords) > 0 {
-
updatedAnswer = append(updatedAnswer, aRecords...)
-
}
+
validAnswers := convertCachedToMagna(entry.Answer, now)
+
if len(validAnswers) > 0 {
+
return validAnswers, nil, nil
+
}
+
} else {
+
if r := getCurrentRequest(ctx); r != nil {
+
r.Context = setCacheMiss(r.Context)
}
-
return updatedAnswer, nil
}
}
-
answer, err := h.resolveQuestion(question, h.RootServers)
-
if err != nil {
-
return nil, err
-
}
-
-
now := time.Now()
-
cachedAnswer := make([]CachedResourceRecord, len(answer))
-
for i, rr := range answer {
-
cachedAnswer[i] = CachedResourceRecord{
-
Record: rr,
-
ExpireAt: now.Add(time.Duration(rr.TTL) * time.Second),
-
}
-
}
-
-
entry := &CacheEntry{
-
Answer: cachedAnswer,
-
}
-
(*h.Cache).Set(cacheKey, entry)
-
-
if len(answer) > 0 && answer[0].RType == magna.CNAMEType && question.QType == magna.AType {
-
cnameTarget := answer[len(answer)-1].RData.String()
-
addressRecords, err := h.resolveWithCache(magna.Question{QName: cnameTarget, QType: question.QType, QClass: question.QClass})
-
if err == nil && len(addressRecords) > 0 {
-
answer = append(answer, addressRecords...)
-
}
-
}
-
-
return answer, nil
-
}
-
-
func (h *QueryHandler) resolveQuestion(question magna.Question, servers []string) ([]magna.ResourceRecord, error) {
-
ctx, cancel := context.WithCancel(context.Background())
+
ctx, cancel := context.WithTimeout(ctx, h.Timeout)
defer cancel()
ch := make(chan queryResponse, len(servers))
-
for _, s := range servers {
go queryServer(ctx, question, s, ch, h.Timeout)
}
···
select {
case res := <-ch:
if res.Error != nil {
-
break
+
h.Logger.Debug("server query failed",
+
"server", res.Server,
+
"error", res.Error)
+
continue
}
msg := res.MSG
+
zone := extractZone(&msg)
+
if zone == "" {
+
zone = currentZone
+
}
+
+
if msg.Header.RCode == magna.NXDOMAIN {
+
entry := CreateCacheEntry(&msg, zone)
+
h.Cache.Set(cacheKey, entry)
+
now := time.Now()
+
return nil, convertCachedToMagna(entry.Authority, now), fmt.Errorf("nxdomain")
+
}
+
if msg.Header.ANCount > 0 {
if msg.Answer[0].RType == magna.CNAMEType {
-
cname_answers, err := h.resolveQuestion(magna.Question{QName: msg.Answer[0].RData.String(), QType: question.QType, QClass: question.QClass}, h.RootServers)
-
if err != nil {
-
continue
+
h.Logger.Debug("following CNAME",
+
"cname", msg.Answer[0].RData.String())
+
+
entry := CreateCacheEntry(&msg, zone)
+
h.Cache.Set(cacheKey, entry)
+
+
answers, auth, err := h.resolveQuestionWithZone(ctx, magna.Question{QName: msg.Answer[0].RData.String(), QType: question.QType, QClass: question.QClass}, h.RootServers, zone)
+
if err == nil {
+
return append(msg.Answer, answers...), auth, nil
}
-
msg.Answer = append(msg.Answer, cname_answers...)
+
+
return nil, auth, err
}
-
return msg.Answer, nil
+
entry := CreateCacheEntry(&msg, zone)
+
h.Cache.Set(cacheKey, entry)
+
return msg.Answer, nil, nil
}
if msg.Header.ARCount > 0 {
···
}
}
-
return h.resolveQuestion(question, nextZone)
+
if len(nextZone) > 0 {
+
return h.resolveQuestion(ctx, question, nextZone)
+
}
}
if msg.Header.NSCount > 0 {
-
var ns []string
-
for _, a := range msg.Authority {
-
if a.RType == magna.NSType {
-
ans, err := h.resolveQuestion(magna.Question{QName: a.RData.String(), QType: magna.AType, QClass: magna.IN}, h.RootServers)
-
if err != nil {
-
break
+
nsRecords := make(map[string]string)
+
glueRecords := make(map[string]string)
+
+
if msg.Header.ARCount > 0 {
+
for _, additional := range msg.Additional {
+
if additional.RType == magna.AType {
+
rule := determineBailiwickRule(zone, additional.Name)
+
if rule != BailiwickOutside {
+
glueRecords[additional.Name] = additional.RData.String()
+
}
}
-
for _, x := range ans {
-
ns = append(ns, x.RData.String())
+
}
+
}
+
+
var nextServers []string
+
var needResolution []string
+
+
for _, auth := range msg.Authority {
+
if auth.RType == magna.NSType {
+
rule := determineBailiwickRule(zone, auth.Name)
+
if rule != BailiwickOutside {
+
nsName := auth.RData.String()
+
nsRecords[nsName] = ""
+
+
if ip, exists := glueRecords[nsName]; exists {
+
nextServers = append(nextServers, ip)
+
} else {
+
needResolution = append(needResolution, nsName)
+
}
}
}
}
-
return h.resolveQuestion(question, ns)
+
if len(nextServers) > 0 {
+
h.Logger.Debug("using glue records for resolution",
+
"servers", nextServers)
+
return h.resolveQuestionWithZone(ctx, question, nextServers, zone)
+
}
+
+
for _, ns := range needResolution {
+
answers, _, err := h.resolveQuestionWithZone(
+
ctx,
+
magna.Question{
+
QName: ns,
+
QType: magna.AType,
+
QClass: magna.IN,
+
},
+
h.RootServers,
+
zone,
+
)
+
if err == nil {
+
for _, ans := range answers {
+
nextServers = append(nextServers, ans.RData.String())
+
}
+
}
+
}
+
+
if len(nextServers) > 0 {
+
return h.resolveQuestionWithZone(ctx, question, nextServers, zone)
+
}
}
-
return []magna.ResourceRecord{}, nil
-
case <-time.After(h.Timeout):
-
cancel()
+
case <-ctx.Done():
+
return nil, nil, ctx.Err()
}
}
-
return []magna.ResourceRecord{}, nil
+
return nil, nil, fmt.Errorf("all queries failed")
}
func queryServer(ctx context.Context, question magna.Question, server string, ch chan<- queryResponse, timeout time.Duration) {
···
ch <- queryResponse{Server: server, Error: fmt.Errorf("timeout")}
}
}
+
+
func convertCachedToMagna(cached []CachedResourceRecord, now time.Time) []magna.ResourceRecord {
+
result := make([]magna.ResourceRecord, 0, len(cached))
+
for _, record := range cached {
+
if now.Before(record.ExpireAt) {
+
rr := record.Record
+
rr.TTL = uint32(record.ExpireAt.Sub(now).Seconds())
+
result = append(result, rr)
+
}
+
}
+
return result
+
}
+
+
func getCurrentRequest(ctx context.Context) *Request {
+
log.Println(ctx)
+
if ctx == nil {
+
log.Println(">>>>")
+
return nil
+
}
+
if r, ok := ctx.Value(contextKey("request")).(*Request); ok {
+
log.Println("<<<<")
+
return r
+
}
+
log.Println("====")
+
return nil
+
}
+35 -5
pkg/dns/server.go
···
package dns
import (
+
"context"
"encoding/binary"
"fmt"
"io"
+
"log"
"log/slog"
"net"
"sync"
···
"code.kiri.systems/kiri/magna"
)
+
type contextKey string
+
+
const (
+
cacheHitKey contextKey = "cache_hit"
+
)
+
+
func setCacheHit(ctx context.Context) context.Context {
+
log.Println("setting to true")
+
return context.WithValue(ctx, cacheHitKey, true)
+
}
+
+
func setCacheMiss(ctx context.Context) context.Context {
+
log.Println("setting to false")
+
return context.WithValue(ctx, cacheHitKey, false)
+
}
+
+
func GetCacheHit(ctx context.Context) bool {
+
v := ctx.Value(cacheHitKey)
+
if v == nil {
+
return false
+
}
+
+
return v.(bool)
+
}
+
type Handler interface {
ServeDNS(ResponseWriter, *Request)
}
···
}
type Request struct {
+
Context context.Context
RemoteAddr net.Addr
Message *magna.Message
}
···
ReadTimeout time.Duration
WriteTimeout time.Duration
Logger *slog.Logger
-
Cache Cache
+
Cache *MemoryCache
}
func (srv *Server) ListenAndServe() error {
···
for {
conn, err := listener.Accept()
if err != nil {
-
srv.Logger.Warn("tcp accept error:", err)
+
srv.Logger.Warn("tcp accept error:", "error", err)
continue
}
···
sizeBuffer := make([]byte, 2)
if _, err := io.ReadFull(conn, sizeBuffer); err != nil {
-
srv.Logger.Warn("tcp-error", err)
+
srv.Logger.Warn("tcp error occurred", "error", err)
return
}
size := binary.BigEndian.Uint16(sizeBuffer)
data := make([]byte, size)
if _, err := io.ReadFull(conn, data); err != nil {
-
srv.Logger.Warn("tcp-error", err)
+
srv.Logger.Warn("tcp error occurred", "error", err)
return
}
···
func (srv *Server) handleQuery(messageBuffer []byte, w ResponseWriter, remoteAddr net.Addr) {
var query magna.Message
if err := query.Decode(messageBuffer); err != nil {
-
srv.Logger.Warn("decode error", err)
+
srv.Logger.Warn("message decode error", "error", err)
return
}
···
Message: &query,
RemoteAddr: remoteAddr,
}
+
+
r.Context = context.WithValue(context.Background(), contextKey("request"), r)
srv.Handler.ServeDNS(w, r)
}
+230
pkg/metrics/clickhouse.go
···
+
package metrics
+
+
import (
+
"database/sql"
+
"fmt"
+
"log"
+
"log/slog"
+
"sync"
+
"time"
+
+
_ "github.com/ClickHouse/clickhouse-go/v2"
+
+
"code.kiri.systems/kiri/alky/pkg/config"
+
"code.kiri.systems/kiri/alky/pkg/dns"
+
)
+
+
type ClickHouseMetrics struct {
+
db *sql.DB
+
config *config.MetricsConfig
+
queryBuffer []QueryMetric
+
cacheBuffer []CacheMetric
+
mu sync.Mutex
+
stopChan chan struct{}
+
logger *slog.Logger
+
}
+
+
type QueryMetric struct {
+
Timestamp time.Time
+
InstanceID string
+
QueryName string
+
QueryType string
+
QueryClass string
+
RemoteAddr string
+
ResponseCode string
+
Duration int64
+
CacheHit bool
+
}
+
+
type CacheMetric struct {
+
Timestamp time.Time
+
TotalQueries int64
+
CacheHits int64
+
CacheMisses int64
+
NegativeHits int64
+
}
+
+
func NewClickHouseMetrics(config *config.MetricsConfig, logger *slog.Logger) (*ClickHouseMetrics, error) {
+
db, err := sql.Open("clickhouse", config.DSN)
+
if err != nil {
+
return nil, fmt.Errorf("failed to connect to ClickHouse: %w", err)
+
}
+
+
m := &ClickHouseMetrics{
+
db: db,
+
config: config,
+
queryBuffer: make([]QueryMetric, 0, config.BatchSize),
+
cacheBuffer: make([]CacheMetric, 0, config.BatchSize),
+
stopChan: make(chan struct{}),
+
logger: logger,
+
}
+
+
if err := m.changeTTL(); err != nil {
+
db.Close()
+
return nil, fmt.Errorf("failed to initialize tables: %w", err)
+
}
+
+
go m.flushLoop()
+
return m, nil
+
}
+
+
func (m *ClickHouseMetrics) RecordQuery(metric QueryMetric) {
+
m.mu.Lock()
+
defer m.mu.Unlock()
+
+
m.queryBuffer = append(m.queryBuffer, metric)
+
if len(m.queryBuffer) >= m.config.BatchSize {
+
m.flush()
+
}
+
}
+
+
func (m *ClickHouseMetrics) RecordCacheMetrics(metric CacheMetric) {
+
m.mu.Lock()
+
defer m.mu.Unlock()
+
+
m.cacheBuffer = append(m.cacheBuffer, metric)
+
if len(m.cacheBuffer) >= m.config.BatchSize {
+
m.flush()
+
}
+
}
+
+
func (m *ClickHouseMetrics) RecordCacheStats(stats *dns.CacheStats) {
+
m.RecordCacheMetrics(CacheMetric{
+
Timestamp: time.Now(),
+
TotalQueries: stats.TotalQueries,
+
CacheHits: stats.CacheHits,
+
CacheMisses: stats.CacheMisses,
+
NegativeHits: stats.NegativeHits,
+
})
+
}
+
+
func (m *ClickHouseMetrics) flushLoop() {
+
ticker := time.NewTicker(m.config.FlushInterval.Duration)
+
defer ticker.Stop()
+
+
for {
+
select {
+
case <-ticker.C:
+
m.mu.Lock()
+
m.flush()
+
m.mu.Unlock()
+
case <-m.stopChan:
+
return
+
}
+
}
+
}
+
+
func (m *ClickHouseMetrics) flush() {
+
if len(m.queryBuffer) > 0 {
+
if err := m.flushQueries(); err != nil {
+
m.logger.Error("Failed to flush query metrics", "error", err)
+
}
+
m.queryBuffer = m.queryBuffer[:0]
+
}
+
+
if len(m.cacheBuffer) > 0 {
+
if err := m.flushCacheMetrics(); err != nil {
+
m.logger.Error("Failed to flush cache metrics", "error", err)
+
}
+
m.cacheBuffer = m.cacheBuffer[:0]
+
}
+
}
+
+
func (m *ClickHouseMetrics) changeTTL() error {
+
if _, err := m.db.Exec(
+
"ALTER TABLE alky_dns_queries MODIFY TTL timestamp + toIntervalSecond(?)",
+
int(m.config.RetentionPeriod.Seconds()),
+
); err != nil {
+
return fmt.Errorf("failed to update alky_dns_queries TTL: %w", err)
+
}
+
+
if _, err := m.db.Exec(
+
"ALTER TABLE alky_dns_cache_metrics MODIFY TTL timestamp + toIntervalSecond(?)",
+
int(m.config.RetentionPeriod.Seconds()),
+
); err != nil {
+
return fmt.Errorf("failed to update alky_dns_cache_metrics TTL: %w", err)
+
}
+
+
return nil
+
}
+
+
func (m *ClickHouseMetrics) flushQueries() error {
+
tx, err := m.db.Begin()
+
if err != nil {
+
return err
+
}
+
defer tx.Rollback()
+
+
stmt, err := tx.Prepare(`
+
INSERT INTO alky_dns_queries (
+
timestamp, instance_id, query_name, query_type, query_class,
+
remote_addr, response_code, duration, cache_hit
+
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
+
`)
+
if err != nil {
+
return err
+
}
+
defer stmt.Close()
+
+
for _, metric := range m.queryBuffer {
+
log.Println(metric)
+
_, err := stmt.Exec(
+
metric.Timestamp,
+
metric.InstanceID,
+
metric.QueryName,
+
metric.QueryType,
+
metric.QueryClass,
+
metric.RemoteAddr,
+
metric.ResponseCode,
+
metric.Duration,
+
metric.CacheHit,
+
)
+
if err != nil {
+
return err
+
}
+
}
+
+
return tx.Commit()
+
}
+
+
func (m *ClickHouseMetrics) flushCacheMetrics() error {
+
tx, err := m.db.Begin()
+
if err != nil {
+
return err
+
}
+
defer tx.Rollback()
+
+
stmt, err := tx.Prepare(`
+
INSERT INTO alky_dns_cache_metrics (
+
timestamp, total_queries, cache_hits, cache_misses,
+
negative_hits
+
) VALUES (?, ?, ?, ?, ?)
+
`)
+
if err != nil {
+
return err
+
}
+
defer stmt.Close()
+
+
for _, metric := range m.cacheBuffer {
+
_, err := stmt.Exec(
+
metric.Timestamp,
+
metric.TotalQueries,
+
metric.CacheHits,
+
metric.CacheMisses,
+
metric.NegativeHits,
+
)
+
if err != nil {
+
return err
+
}
+
}
+
+
return tx.Commit()
+
}
+
+
func (m *ClickHouseMetrics) Close() error {
+
close(m.stopChan)
+
m.mu.Lock()
+
defer m.mu.Unlock()
+
m.flush()
+
return m.db.Close()
+
}
+60
pkg/metrics/middleware.go
···
+
package metrics
+
+
import (
+
"context"
+
"fmt"
+
"log"
+
"os"
+
"time"
+
+
"code.kiri.systems/kiri/alky/pkg/dns"
+
)
+
+
var (
+
hostID string
+
version string
+
)
+
+
func init() {
+
var err error
+
hostname, err := os.Hostname()
+
if err != nil {
+
hostname = "unknown"
+
}
+
+
if version != "" {
+
hostID = fmt.Sprintf("%s-%s", hostname, version)
+
} else {
+
hostID = hostname
+
}
+
}
+
+
func MetricsMiddleware(metrics *ClickHouseMetrics) func(dns.Handler) dns.Handler {
+
return func(next dns.Handler) dns.Handler {
+
return dns.HandlerFunc(func(w dns.ResponseWriter, r *dns.Request) {
+
if r.Context == nil {
+
r.Context = context.Background()
+
}
+
+
start := time.Now()
+
next.ServeDNS(w, r)
+
duration := time.Since(start)
+
+
question := r.Message.Question[0]
+
log.Println(hostID)
+
log.Println(dns.GetCacheHit(r.Context))
+
+
metrics.RecordQuery(QueryMetric{
+
Timestamp: time.Now(),
+
InstanceID: hostID,
+
QueryName: question.QName,
+
QueryType: question.QType.String(),
+
QueryClass: question.QClass.String(),
+
RemoteAddr: r.RemoteAddr.String(),
+
ResponseCode: r.Message.Header.RCode.String(),
+
Duration: duration.Nanoseconds(),
+
CacheHit: dns.GetCacheHit(r.Context),
+
})
+
})
+
}
+
}