1package clickhouse_inserter
2
3import (
4 "context"
5 "log/slog"
6 "slices"
7 "sync"
8 "time"
9
10 "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
11 "github.com/prometheus/client_golang/prometheus"
12 "github.com/prometheus/client_golang/prometheus/promauto"
13)
14
15type Inserter struct {
16 conn driver.Conn
17 query string
18 mu sync.Mutex
19 queuedEvents []any
20 batchSize int
21 insertsCounter *prometheus.CounterVec
22 pendingSends prometheus.Gauge
23 histogram *prometheus.HistogramVec
24 logger *slog.Logger
25 prefix string
26}
27
28type Args struct {
29 Conn driver.Conn
30 Query string
31 BatchSize int
32 PrometheusCounterPrefix string
33 Logger *slog.Logger
34 Histogram *prometheus.HistogramVec
35}
36
37func New(ctx context.Context, args *Args) (*Inserter, error) {
38 if args.Logger == nil {
39 args.Logger = slog.Default()
40 }
41
42 inserter := &Inserter{
43 conn: args.Conn,
44 query: args.Query,
45 mu: sync.Mutex{},
46 batchSize: args.BatchSize,
47 histogram: args.Histogram,
48 logger: args.Logger,
49 prefix: args.PrometheusCounterPrefix,
50 }
51
52 if args.PrometheusCounterPrefix != "" {
53 inserter.insertsCounter = promauto.NewCounterVec(prometheus.CounterOpts{
54 Name: "clickhouse_inserts",
55 Namespace: args.PrometheusCounterPrefix,
56 Help: "total inserts into clickhouse by status",
57 }, []string{"status"})
58
59 inserter.pendingSends = promauto.NewGauge(prometheus.GaugeOpts{
60 Name: "clickhouse_pending_sends",
61 Namespace: args.PrometheusCounterPrefix,
62 Help: "total clickhouse insertions that are in progress",
63 })
64
65 } else {
66 args.Logger.Info("no prometheus prefix provided, no metrics will be registered for this counter", "query", args.Query)
67 }
68
69 return inserter, nil
70}
71
72func (i *Inserter) Insert(ctx context.Context, e any) error {
73 i.mu.Lock()
74
75 i.queuedEvents = append(i.queuedEvents, e)
76
77 var toInsert []any
78 if len(i.queuedEvents) >= i.batchSize {
79 toInsert = slices.Clone(i.queuedEvents)
80 i.queuedEvents = nil
81 }
82
83 i.mu.Unlock()
84
85 if len(toInsert) > 0 {
86 i.sendStream(ctx, toInsert)
87 }
88
89 return nil
90}
91
92func (i *Inserter) Close(ctx context.Context) error {
93 i.mu.Lock()
94
95 var toInsert []any
96
97 if len(i.queuedEvents) > 0 {
98 toInsert = slices.Clone(i.queuedEvents)
99 i.queuedEvents = nil
100 }
101
102 i.mu.Unlock()
103
104 if len(toInsert) > 0 {
105 i.sendStream(ctx, toInsert)
106 }
107
108 return nil
109}
110
111func (i *Inserter) sendStream(ctx context.Context, toInsert []any) {
112 i.pendingSends.Inc()
113 defer i.pendingSends.Dec()
114
115 if i.histogram != nil {
116 start := time.Now()
117 defer func() {
118 i.histogram.WithLabelValues(i.prefix).Observe(time.Since(start).Seconds())
119 }()
120 }
121
122 if len(toInsert) == 0 {
123 return
124 }
125
126 status := "ok"
127 defer func() {
128 i.insertsCounter.WithLabelValues(status).Add(float64(len(toInsert)))
129 }()
130
131 batch, err := i.conn.PrepareBatch(ctx, i.query)
132 if err != nil {
133 i.logger.Error("error creating batch", "prefix", i.prefix, "error", err)
134 status = "failed"
135 return
136 }
137
138 for _, d := range toInsert {
139 if err := batch.AppendStruct(&d); err != nil {
140 i.logger.Error("error appending to batch", "prefix", i.prefix, "error", err)
141 }
142 }
143
144 if err := batch.Send(); err != nil {
145 status = "failed"
146 i.logger.Error("error sending batch", "prefix", i.prefix, "error", err)
147 }
148}