this repo has no description
1package clickhouse_inserter 2 3import ( 4 "context" 5 "log/slog" 6 "slices" 7 "sync" 8 "time" 9 10 "github.com/ClickHouse/clickhouse-go/v2/lib/driver" 11 "github.com/prometheus/client_golang/prometheus" 12 "github.com/prometheus/client_golang/prometheus/promauto" 13) 14 15type Inserter struct { 16 conn driver.Conn 17 query string 18 mu sync.Mutex 19 queuedEvents []any 20 batchSize int 21 insertsCounter *prometheus.CounterVec 22 pendingSends prometheus.Gauge 23 histogram *prometheus.HistogramVec 24 logger *slog.Logger 25 prefix string 26} 27 28type Args struct { 29 Conn driver.Conn 30 Query string 31 BatchSize int 32 PrometheusCounterPrefix string 33 Logger *slog.Logger 34 Histogram *prometheus.HistogramVec 35} 36 37func New(ctx context.Context, args *Args) (*Inserter, error) { 38 if args.Logger == nil { 39 args.Logger = slog.Default() 40 } 41 42 inserter := &Inserter{ 43 conn: args.Conn, 44 query: args.Query, 45 mu: sync.Mutex{}, 46 batchSize: args.BatchSize, 47 histogram: args.Histogram, 48 logger: args.Logger, 49 prefix: args.PrometheusCounterPrefix, 50 } 51 52 if args.PrometheusCounterPrefix != "" { 53 inserter.insertsCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 54 Name: "clickhouse_inserts", 55 Namespace: args.PrometheusCounterPrefix, 56 Help: "total inserts into clickhouse by status", 57 }, []string{"status"}) 58 59 inserter.pendingSends = promauto.NewGauge(prometheus.GaugeOpts{ 60 Name: "clickhouse_pending_sends", 61 Namespace: args.PrometheusCounterPrefix, 62 Help: "total clickhouse insertions that are in progress", 63 }) 64 65 } else { 66 args.Logger.Info("no prometheus prefix provided, no metrics will be registered for this counter", "query", args.Query) 67 } 68 69 return inserter, nil 70} 71 72func (i *Inserter) Insert(ctx context.Context, e any) error { 73 i.mu.Lock() 74 75 i.queuedEvents = append(i.queuedEvents, e) 76 77 var toInsert []any 78 if len(i.queuedEvents) >= i.batchSize { 79 toInsert = slices.Clone(i.queuedEvents) 80 i.queuedEvents = nil 81 } 82 83 i.mu.Unlock() 84 85 if len(toInsert) > 0 { 86 i.sendStream(ctx, toInsert) 87 } 88 89 return nil 90} 91 92func (i *Inserter) Close(ctx context.Context) error { 93 i.mu.Lock() 94 95 var toInsert []any 96 97 if len(i.queuedEvents) > 0 { 98 toInsert = slices.Clone(i.queuedEvents) 99 i.queuedEvents = nil 100 } 101 102 i.mu.Unlock() 103 104 if len(toInsert) > 0 { 105 i.sendStream(ctx, toInsert) 106 } 107 108 return nil 109} 110 111func (i *Inserter) sendStream(ctx context.Context, toInsert []any) { 112 i.pendingSends.Inc() 113 defer i.pendingSends.Dec() 114 115 if i.histogram != nil { 116 start := time.Now() 117 defer func() { 118 i.histogram.WithLabelValues(i.prefix).Observe(time.Since(start).Seconds()) 119 }() 120 } 121 122 if len(toInsert) == 0 { 123 return 124 } 125 126 status := "ok" 127 defer func() { 128 i.insertsCounter.WithLabelValues(status).Add(float64(len(toInsert))) 129 }() 130 131 batch, err := i.conn.PrepareBatch(ctx, i.query) 132 if err != nil { 133 i.logger.Error("error creating batch", "prefix", i.prefix, "error", err) 134 status = "failed" 135 return 136 } 137 138 for _, d := range toInsert { 139 if err := batch.AppendStruct(&d); err != nil { 140 i.logger.Error("error appending to batch", "prefix", i.prefix, "error", err) 141 } 142 } 143 144 if err := batch.Send(); err != nil { 145 status = "failed" 146 i.logger.Error("error sending batch", "prefix", i.prefix, "error", err) 147 } 148}