1package clickhouse_inserter
2
3import (
4 "context"
5 "log/slog"
6 "reflect"
7 "slices"
8 "sync"
9 "time"
10
11 "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
12 "github.com/prometheus/client_golang/prometheus"
13 "github.com/prometheus/client_golang/prometheus/promauto"
14 "go.uber.org/ratelimit"
15)
16
17type Inserter struct {
18 conn driver.Conn
19 query string
20 mu sync.Mutex
21 queuedEvents []any
22 batchSize int
23 insertsCounter *prometheus.CounterVec
24 pendingSends prometheus.Gauge
25 histogram *prometheus.HistogramVec
26 logger *slog.Logger
27 prefix string
28 rateLimit ratelimit.Limiter
29}
30
31type Args struct {
32 Conn driver.Conn
33 Query string
34 BatchSize int
35 PrometheusCounterPrefix string
36 Logger *slog.Logger
37 Histogram *prometheus.HistogramVec
38 RateLimit int
39}
40
41func New(ctx context.Context, args *Args) (*Inserter, error) {
42 if args.Logger == nil {
43 args.Logger = slog.Default()
44 }
45
46 inserter := &Inserter{
47 conn: args.Conn,
48 query: args.Query,
49 mu: sync.Mutex{},
50 batchSize: args.BatchSize,
51 histogram: args.Histogram,
52 logger: args.Logger,
53 prefix: args.PrometheusCounterPrefix,
54 }
55
56 if args.RateLimit != 0 {
57 rateLimit := ratelimit.New(args.RateLimit)
58 inserter.rateLimit = rateLimit
59 }
60
61 if args.PrometheusCounterPrefix != "" {
62 inserter.insertsCounter = promauto.NewCounterVec(prometheus.CounterOpts{
63 Name: "clickhouse_inserts",
64 Namespace: args.PrometheusCounterPrefix,
65 Help: "total inserts into clickhouse by status",
66 }, []string{"status"})
67
68 inserter.pendingSends = promauto.NewGauge(prometheus.GaugeOpts{
69 Name: "clickhouse_pending_sends",
70 Namespace: args.PrometheusCounterPrefix,
71 Help: "total clickhouse insertions that are in progress",
72 })
73
74 } else {
75 args.Logger.Info("no prometheus prefix provided, no metrics will be registered for this counter", "query", args.Query)
76 }
77
78 return inserter, nil
79}
80
81func (i *Inserter) Insert(ctx context.Context, e any) error {
82 i.mu.Lock()
83
84 i.queuedEvents = append(i.queuedEvents, e)
85
86 var toInsert []any
87 if len(i.queuedEvents) >= i.batchSize {
88 toInsert = slices.Clone(i.queuedEvents)
89 i.queuedEvents = nil
90 }
91
92 i.mu.Unlock()
93
94 if len(toInsert) > 0 {
95 i.sendStream(ctx, toInsert)
96 }
97
98 return nil
99}
100
101func (i *Inserter) Close(ctx context.Context) error {
102 i.mu.Lock()
103
104 var toInsert []any
105
106 if len(i.queuedEvents) > 0 {
107 toInsert = slices.Clone(i.queuedEvents)
108 i.queuedEvents = nil
109 }
110
111 i.mu.Unlock()
112
113 if len(toInsert) > 0 {
114 i.sendStream(ctx, toInsert)
115 }
116
117 return nil
118}
119
120func (i *Inserter) sendStream(ctx context.Context, toInsert []any) {
121 if i.pendingSends != nil {
122 i.pendingSends.Inc()
123 defer i.pendingSends.Dec()
124 }
125
126 if i.histogram != nil {
127 start := time.Now()
128 defer func() {
129 i.histogram.WithLabelValues(i.prefix).Observe(time.Since(start).Seconds())
130 }()
131 }
132
133 if len(toInsert) == 0 {
134 return
135 }
136
137 status := "ok"
138 if i.insertsCounter != nil {
139 defer func() {
140 i.insertsCounter.WithLabelValues(status).Add(float64(len(toInsert)))
141 }()
142 }
143
144 batch, err := i.conn.PrepareBatch(ctx, i.query)
145 if err != nil {
146 i.logger.Error("error creating batch", "prefix", i.prefix, "error", err)
147 status = "failed"
148 return
149 }
150
151 for _, d := range toInsert {
152 var structPtr any
153 if reflect.TypeOf(d).Kind() == reflect.Ptr {
154 structPtr = d
155 } else {
156 v := reflect.ValueOf(d)
157 if v.CanAddr() {
158 structPtr = v.Addr().Addr().Interface()
159 } else {
160 ptr := reflect.New(v.Type())
161 ptr.Elem().Set(v)
162 structPtr = ptr.Interface()
163 }
164 }
165
166 if err := batch.AppendStruct(structPtr); err != nil {
167 i.logger.Error("error appending to batch", "prefix", i.prefix, "error", err)
168 }
169 }
170
171 if i.rateLimit != nil {
172 i.rateLimit.Take()
173 }
174
175 if err := batch.Send(); err != nil {
176 status = "failed"
177 i.logger.Error("error sending batch", "prefix", i.prefix, "error", err)
178 }
179}