this repo has no description
at main 4.0 kB view raw
1package clickhouse_inserter 2 3import ( 4 "context" 5 "log/slog" 6 "reflect" 7 "slices" 8 "sync" 9 "time" 10 11 "github.com/ClickHouse/clickhouse-go/v2/lib/driver" 12 "github.com/prometheus/client_golang/prometheus" 13 "github.com/prometheus/client_golang/prometheus/promauto" 14 "go.uber.org/ratelimit" 15) 16 17type Inserter struct { 18 conn driver.Conn 19 query string 20 mu sync.Mutex 21 queuedEvents []any 22 batchSize int 23 insertsCounter *prometheus.CounterVec 24 pendingSends prometheus.Gauge 25 histogram *prometheus.HistogramVec 26 logger *slog.Logger 27 prefix string 28 rateLimit ratelimit.Limiter 29} 30 31type Args struct { 32 Conn driver.Conn 33 Query string 34 BatchSize int 35 PrometheusCounterPrefix string 36 Logger *slog.Logger 37 Histogram *prometheus.HistogramVec 38 RateLimit int 39} 40 41func New(ctx context.Context, args *Args) (*Inserter, error) { 42 if args.Logger == nil { 43 args.Logger = slog.Default() 44 } 45 46 inserter := &Inserter{ 47 conn: args.Conn, 48 query: args.Query, 49 mu: sync.Mutex{}, 50 batchSize: args.BatchSize, 51 histogram: args.Histogram, 52 logger: args.Logger, 53 prefix: args.PrometheusCounterPrefix, 54 } 55 56 if args.RateLimit != 0 { 57 rateLimit := ratelimit.New(args.RateLimit) 58 inserter.rateLimit = rateLimit 59 } 60 61 if args.PrometheusCounterPrefix != "" { 62 inserter.insertsCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 63 Name: "clickhouse_inserts", 64 Namespace: args.PrometheusCounterPrefix, 65 Help: "total inserts into clickhouse by status", 66 }, []string{"status"}) 67 68 inserter.pendingSends = promauto.NewGauge(prometheus.GaugeOpts{ 69 Name: "clickhouse_pending_sends", 70 Namespace: args.PrometheusCounterPrefix, 71 Help: "total clickhouse insertions that are in progress", 72 }) 73 74 } else { 75 args.Logger.Info("no prometheus prefix provided, no metrics will be registered for this counter", "query", args.Query) 76 } 77 78 return inserter, nil 79} 80 81func (i *Inserter) Insert(ctx context.Context, e any) error { 82 i.mu.Lock() 83 84 i.queuedEvents = append(i.queuedEvents, e) 85 86 var toInsert []any 87 if len(i.queuedEvents) >= i.batchSize { 88 toInsert = slices.Clone(i.queuedEvents) 89 i.queuedEvents = nil 90 } 91 92 i.mu.Unlock() 93 94 if len(toInsert) > 0 { 95 i.sendStream(ctx, toInsert) 96 } 97 98 return nil 99} 100 101func (i *Inserter) Close(ctx context.Context) error { 102 i.mu.Lock() 103 104 var toInsert []any 105 106 if len(i.queuedEvents) > 0 { 107 toInsert = slices.Clone(i.queuedEvents) 108 i.queuedEvents = nil 109 } 110 111 i.mu.Unlock() 112 113 if len(toInsert) > 0 { 114 i.sendStream(ctx, toInsert) 115 } 116 117 return nil 118} 119 120func (i *Inserter) sendStream(ctx context.Context, toInsert []any) { 121 if i.pendingSends != nil { 122 i.pendingSends.Inc() 123 defer i.pendingSends.Dec() 124 } 125 126 if i.histogram != nil { 127 start := time.Now() 128 defer func() { 129 i.histogram.WithLabelValues(i.prefix).Observe(time.Since(start).Seconds()) 130 }() 131 } 132 133 if len(toInsert) == 0 { 134 return 135 } 136 137 status := "ok" 138 if i.insertsCounter != nil { 139 defer func() { 140 i.insertsCounter.WithLabelValues(status).Add(float64(len(toInsert))) 141 }() 142 } 143 144 batch, err := i.conn.PrepareBatch(ctx, i.query) 145 if err != nil { 146 i.logger.Error("error creating batch", "prefix", i.prefix, "error", err) 147 status = "failed" 148 return 149 } 150 151 for _, d := range toInsert { 152 var structPtr any 153 if reflect.TypeOf(d).Kind() == reflect.Ptr { 154 structPtr = d 155 } else { 156 v := reflect.ValueOf(d) 157 if v.CanAddr() { 158 structPtr = v.Addr().Addr().Interface() 159 } else { 160 ptr := reflect.New(v.Type()) 161 ptr.Elem().Set(v) 162 structPtr = ptr.Interface() 163 } 164 } 165 166 if err := batch.AppendStruct(structPtr); err != nil { 167 i.logger.Error("error appending to batch", "prefix", i.prefix, "error", err) 168 } 169 } 170 171 if i.rateLimit != nil { 172 i.rateLimit.Take() 173 } 174 175 if err := batch.Send(); err != nil { 176 status = "failed" 177 i.logger.Error("error sending batch", "prefix", i.prefix, "error", err) 178 } 179}