1package clickhouse_inserter
2
3import (
4 "context"
5 "log/slog"
6 "reflect"
7 "slices"
8 "sync"
9 "time"
10
11 "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
12 "github.com/prometheus/client_golang/prometheus"
13 "github.com/prometheus/client_golang/prometheus/promauto"
14)
15
16type Inserter struct {
17 conn driver.Conn
18 query string
19 mu sync.Mutex
20 queuedEvents []any
21 batchSize int
22 insertsCounter *prometheus.CounterVec
23 pendingSends prometheus.Gauge
24 histogram *prometheus.HistogramVec
25 logger *slog.Logger
26 prefix string
27}
28
29type Args struct {
30 Conn driver.Conn
31 Query string
32 BatchSize int
33 PrometheusCounterPrefix string
34 Logger *slog.Logger
35 Histogram *prometheus.HistogramVec
36}
37
38func New(ctx context.Context, args *Args) (*Inserter, error) {
39 if args.Logger == nil {
40 args.Logger = slog.Default()
41 }
42
43 inserter := &Inserter{
44 conn: args.Conn,
45 query: args.Query,
46 mu: sync.Mutex{},
47 batchSize: args.BatchSize,
48 histogram: args.Histogram,
49 logger: args.Logger,
50 prefix: args.PrometheusCounterPrefix,
51 }
52
53 if args.PrometheusCounterPrefix != "" {
54 inserter.insertsCounter = promauto.NewCounterVec(prometheus.CounterOpts{
55 Name: "clickhouse_inserts",
56 Namespace: args.PrometheusCounterPrefix,
57 Help: "total inserts into clickhouse by status",
58 }, []string{"status"})
59
60 inserter.pendingSends = promauto.NewGauge(prometheus.GaugeOpts{
61 Name: "clickhouse_pending_sends",
62 Namespace: args.PrometheusCounterPrefix,
63 Help: "total clickhouse insertions that are in progress",
64 })
65
66 } else {
67 args.Logger.Info("no prometheus prefix provided, no metrics will be registered for this counter", "query", args.Query)
68 }
69
70 return inserter, nil
71}
72
73func (i *Inserter) Insert(ctx context.Context, e any) error {
74 i.mu.Lock()
75
76 i.queuedEvents = append(i.queuedEvents, e)
77
78 var toInsert []any
79 if len(i.queuedEvents) >= i.batchSize {
80 toInsert = slices.Clone(i.queuedEvents)
81 i.queuedEvents = nil
82 }
83
84 i.mu.Unlock()
85
86 if len(toInsert) > 0 {
87 i.sendStream(ctx, toInsert)
88 }
89
90 return nil
91}
92
93func (i *Inserter) Close(ctx context.Context) error {
94 i.mu.Lock()
95
96 var toInsert []any
97
98 if len(i.queuedEvents) > 0 {
99 toInsert = slices.Clone(i.queuedEvents)
100 i.queuedEvents = nil
101 }
102
103 i.mu.Unlock()
104
105 if len(toInsert) > 0 {
106 i.sendStream(ctx, toInsert)
107 }
108
109 return nil
110}
111
112func (i *Inserter) sendStream(ctx context.Context, toInsert []any) {
113 i.pendingSends.Inc()
114 defer i.pendingSends.Dec()
115
116 if i.histogram != nil {
117 start := time.Now()
118 defer func() {
119 i.histogram.WithLabelValues(i.prefix).Observe(time.Since(start).Seconds())
120 }()
121 }
122
123 if len(toInsert) == 0 {
124 return
125 }
126
127 status := "ok"
128 defer func() {
129 i.insertsCounter.WithLabelValues(status).Add(float64(len(toInsert)))
130 }()
131
132 batch, err := i.conn.PrepareBatch(ctx, i.query)
133 if err != nil {
134 i.logger.Error("error creating batch", "prefix", i.prefix, "error", err)
135 status = "failed"
136 return
137 }
138
139 for _, d := range toInsert {
140 var structPtr any
141 if reflect.TypeOf(d).Kind() == reflect.Ptr {
142 structPtr = d
143 } else {
144 v := reflect.ValueOf(d)
145 if v.CanAddr() {
146 structPtr = v.Addr().Addr().Interface()
147 } else {
148 ptr := reflect.New(v.Type())
149 ptr.Elem().Set(v)
150 structPtr = ptr.Interface()
151 }
152 }
153
154 if err := batch.AppendStruct(structPtr); err != nil {
155 i.logger.Error("error appending to batch", "prefix", i.prefix, "error", err)
156 }
157 }
158
159 if err := batch.Send(); err != nil {
160 status = "failed"
161 i.logger.Error("error sending batch", "prefix", i.prefix, "error", err)
162 }
163}