this repo has no description
1package clickhouse_inserter 2 3import ( 4 "context" 5 "log/slog" 6 "reflect" 7 "slices" 8 "sync" 9 "time" 10 11 "github.com/ClickHouse/clickhouse-go/v2/lib/driver" 12 "github.com/prometheus/client_golang/prometheus" 13 "github.com/prometheus/client_golang/prometheus/promauto" 14) 15 16type Inserter struct { 17 conn driver.Conn 18 query string 19 mu sync.Mutex 20 queuedEvents []any 21 batchSize int 22 insertsCounter *prometheus.CounterVec 23 pendingSends prometheus.Gauge 24 histogram *prometheus.HistogramVec 25 logger *slog.Logger 26 prefix string 27} 28 29type Args struct { 30 Conn driver.Conn 31 Query string 32 BatchSize int 33 PrometheusCounterPrefix string 34 Logger *slog.Logger 35 Histogram *prometheus.HistogramVec 36} 37 38func New(ctx context.Context, args *Args) (*Inserter, error) { 39 if args.Logger == nil { 40 args.Logger = slog.Default() 41 } 42 43 inserter := &Inserter{ 44 conn: args.Conn, 45 query: args.Query, 46 mu: sync.Mutex{}, 47 batchSize: args.BatchSize, 48 histogram: args.Histogram, 49 logger: args.Logger, 50 prefix: args.PrometheusCounterPrefix, 51 } 52 53 if args.PrometheusCounterPrefix != "" { 54 inserter.insertsCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 55 Name: "clickhouse_inserts", 56 Namespace: args.PrometheusCounterPrefix, 57 Help: "total inserts into clickhouse by status", 58 }, []string{"status"}) 59 60 inserter.pendingSends = promauto.NewGauge(prometheus.GaugeOpts{ 61 Name: "clickhouse_pending_sends", 62 Namespace: args.PrometheusCounterPrefix, 63 Help: "total clickhouse insertions that are in progress", 64 }) 65 66 } else { 67 args.Logger.Info("no prometheus prefix provided, no metrics will be registered for this counter", "query", args.Query) 68 } 69 70 return inserter, nil 71} 72 73func (i *Inserter) Insert(ctx context.Context, e any) error { 74 i.mu.Lock() 75 76 i.queuedEvents = append(i.queuedEvents, e) 77 78 var toInsert []any 79 if len(i.queuedEvents) >= i.batchSize { 80 toInsert = slices.Clone(i.queuedEvents) 81 i.queuedEvents = nil 82 } 83 84 i.mu.Unlock() 85 86 if len(toInsert) > 0 { 87 i.sendStream(ctx, toInsert) 88 } 89 90 return nil 91} 92 93func (i *Inserter) Close(ctx context.Context) error { 94 i.mu.Lock() 95 96 var toInsert []any 97 98 if len(i.queuedEvents) > 0 { 99 toInsert = slices.Clone(i.queuedEvents) 100 i.queuedEvents = nil 101 } 102 103 i.mu.Unlock() 104 105 if len(toInsert) > 0 { 106 i.sendStream(ctx, toInsert) 107 } 108 109 return nil 110} 111 112func (i *Inserter) sendStream(ctx context.Context, toInsert []any) { 113 i.pendingSends.Inc() 114 defer i.pendingSends.Dec() 115 116 if i.histogram != nil { 117 start := time.Now() 118 defer func() { 119 i.histogram.WithLabelValues(i.prefix).Observe(time.Since(start).Seconds()) 120 }() 121 } 122 123 if len(toInsert) == 0 { 124 return 125 } 126 127 status := "ok" 128 defer func() { 129 i.insertsCounter.WithLabelValues(status).Add(float64(len(toInsert))) 130 }() 131 132 batch, err := i.conn.PrepareBatch(ctx, i.query) 133 if err != nil { 134 i.logger.Error("error creating batch", "prefix", i.prefix, "error", err) 135 status = "failed" 136 return 137 } 138 139 for _, d := range toInsert { 140 var structPtr any 141 if reflect.TypeOf(d).Kind() == reflect.Ptr { 142 structPtr = d 143 } else { 144 v := reflect.ValueOf(d) 145 if v.CanAddr() { 146 structPtr = v.Addr().Addr().Interface() 147 } else { 148 ptr := reflect.New(v.Type()) 149 ptr.Elem().Set(v) 150 structPtr = ptr.Interface() 151 } 152 } 153 154 if err := batch.AppendStruct(structPtr); err != nil { 155 i.logger.Error("error appending to batch", "prefix", i.prefix, "error", err) 156 } 157 } 158 159 if err := batch.Send(); err != nil { 160 status = "failed" 161 i.logger.Error("error sending batch", "prefix", i.prefix, "error", err) 162 } 163}