this repo has no description
1package photocopy 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "net/http" 8 "sync" 9 "time" 10 11 "github.com/ClickHouse/clickhouse-go/v2" 12 "github.com/ClickHouse/clickhouse-go/v2/lib/driver" 13 "github.com/haileyok/photocopy/clickhouse_inserter" 14 "github.com/prometheus/client_golang/prometheus" 15 "github.com/prometheus/client_golang/prometheus/promauto" 16 "github.com/prometheus/client_golang/prometheus/promhttp" 17) 18 19type Photocopy struct { 20 logger *slog.Logger 21 wg sync.WaitGroup 22 23 relayHost string 24 cursor string 25 cursorFile string 26 metricsAddr string 27 28 inserters *Inserters 29 30 plcScraper *PLCScraper 31 32 ratelimitBypassKey string 33 34 conn driver.Conn 35 36 nervanaClient *http.Client 37 nervanaEndpoint string 38 nervanaApiKey string 39} 40 41type Inserters struct { 42 followsInserter *clickhouse_inserter.Inserter 43 interactionsInserter *clickhouse_inserter.Inserter 44 postsInserter *clickhouse_inserter.Inserter 45 plcInserter *clickhouse_inserter.Inserter 46 recordsInserter *clickhouse_inserter.Inserter 47 deletesInserter *clickhouse_inserter.Inserter 48 labelsInserter *clickhouse_inserter.Inserter 49} 50 51type Args struct { 52 Logger *slog.Logger 53 RelayHost string 54 MetricsAddr string 55 CursorFile string 56 PLCScraperCursorFile string 57 ClickhouseAddr string 58 ClickhouseDatabase string 59 ClickhouseUser string 60 ClickhousePass string 61 RatelimitBypassKey string 62 NervanaEndpoint string 63 NervanaApiKey string 64} 65 66func New(ctx context.Context, args *Args) (*Photocopy, error) { 67 conn, err := clickhouse.Open(&clickhouse.Options{ 68 Addr: []string{args.ClickhouseAddr}, 69 Auth: clickhouse.Auth{ 70 Database: args.ClickhouseDatabase, 71 Username: args.ClickhouseUser, 72 Password: args.ClickhousePass, 73 }, 74 }) 75 if err != nil { 76 return nil, err 77 } 78 79 p := &Photocopy{ 80 logger: args.Logger, 81 metricsAddr: args.MetricsAddr, 82 relayHost: args.RelayHost, 83 wg: sync.WaitGroup{}, 84 cursorFile: args.CursorFile, 85 ratelimitBypassKey: args.RatelimitBypassKey, 86 conn: conn, 87 } 88 89 insertionsHist := promauto.NewHistogramVec(prometheus.HistogramOpts{ 90 Name: "photocopy_inserts_time", 91 Help: "histogram of photocopy inserts", 92 Buckets: prometheus.ExponentialBucketsRange(0.0001, 30, 20), 93 }, []string{"type"}) 94 95 fi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 96 PrometheusCounterPrefix: "photocopy_follows", 97 Histogram: insertionsHist, 98 BatchSize: 500, 99 Logger: p.logger, 100 Conn: conn, 101 Query: "INSERT INTO follow (uri, did, rkey, created_at, indexed_at, subject)", 102 RateLimit: 3, 103 }) 104 if err != nil { 105 return nil, err 106 } 107 108 pi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 109 PrometheusCounterPrefix: "photocopy_posts", 110 Histogram: insertionsHist, 111 BatchSize: 300, 112 Logger: p.logger, 113 Conn: conn, 114 Query: "INSERT INTO post (uri, did, rkey, created_at, indexed_at, root_uri, root_did, parent_uri, parent_did, quote_uri, quote_did, lang)", 115 RateLimit: 3, 116 }) 117 if err != nil { 118 return nil, err 119 } 120 121 ii, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 122 PrometheusCounterPrefix: "photocopy_interactions", 123 Histogram: insertionsHist, 124 BatchSize: 1000, 125 Logger: p.logger, 126 Conn: conn, 127 Query: "INSERT INTO interaction (uri, did, rkey, kind, created_at, indexed_at, subject_uri, subject_did)", 128 RateLimit: 3, 129 }) 130 if err != nil { 131 return nil, err 132 } 133 134 ri, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 135 PrometheusCounterPrefix: "photocopy_records", 136 Histogram: insertionsHist, 137 BatchSize: 2500, 138 Logger: p.logger, 139 Conn: conn, 140 Query: "INSERT INTO record (did, rkey, collection, cid, seq, raw, created_at)", 141 RateLimit: 3, 142 }) 143 if err != nil { 144 return nil, err 145 } 146 147 di, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 148 PrometheusCounterPrefix: "photocopy_deletes", 149 Histogram: insertionsHist, 150 BatchSize: 500, 151 Logger: p.logger, 152 Conn: conn, 153 Query: "INSERT INTO delete (did, rkey, created_at)", 154 RateLimit: 3, 155 }) 156 if err != nil { 157 return nil, err 158 } 159 160 li, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 161 PrometheusCounterPrefix: "photocopy_labels", 162 Histogram: insertionsHist, 163 BatchSize: 100, 164 Logger: p.logger, 165 Conn: conn, 166 Query: "INSERT INTO post_label (did, rkey, text, label, entity_id, description, topic, created_at)", 167 RateLimit: 3, 168 }) 169 if err != nil { 170 return nil, err 171 } 172 173 is := &Inserters{ 174 followsInserter: fi, 175 postsInserter: pi, 176 interactionsInserter: ii, 177 recordsInserter: ri, 178 deletesInserter: di, 179 labelsInserter: li, 180 } 181 182 p.inserters = is 183 184 plci, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 185 PrometheusCounterPrefix: "photocopy_plc_entries", 186 Histogram: insertionsHist, 187 BatchSize: 100, 188 Logger: args.Logger, 189 Conn: conn, 190 Query: `INSERT INTO plc ( 191 did, cid, nullified, created_at, plc_op_sig, plc_op_prev, plc_op_type, 192 plc_op_services, plc_op_also_known_as, plc_op_rotation_keys, 193 plc_tomb_sig, plc_tomb_prev, plc_tomb_type, 194 legacy_op_sig, legacy_op_prev, legacy_op_type, legacy_op_handle, 195 legacy_op_service, legacy_op_signing_key, legacy_op_recovery_key 196 )`, 197 }) 198 if err != nil { 199 return nil, err 200 } 201 202 plcs, err := NewPLCScraper(ctx, PLCScraperArgs{ 203 Logger: p.logger, 204 Inserter: plci, 205 CursorFile: args.PLCScraperCursorFile, 206 }) 207 if err != nil { 208 return nil, err 209 } 210 211 p.inserters.plcInserter = plci 212 p.plcScraper = plcs 213 214 if args.NervanaApiKey != "" && args.NervanaEndpoint != "" { 215 p.nervanaClient = &http.Client{ 216 Timeout: 5 * time.Second, 217 } 218 p.nervanaEndpoint = args.NervanaEndpoint 219 p.nervanaApiKey = args.NervanaApiKey 220 } 221 222 return p, nil 223} 224 225func (p *Photocopy) Run(baseCtx context.Context, withBackfill bool) error { 226 ctx, cancel := context.WithCancel(baseCtx) 227 228 metricsServer := http.NewServeMux() 229 metricsServer.Handle("/metrics", promhttp.Handler()) 230 231 go func() { 232 p.logger.Info("Starting metrics server") 233 if err := http.ListenAndServe(p.metricsAddr, metricsServer); err != nil { 234 p.logger.Error("metrics server failed", "error", err) 235 } 236 }() 237 238 go func(ctx context.Context, cancel context.CancelFunc) { 239 p.logger.Info("starting relay", "relayHost", p.relayHost) 240 if err := p.startConsumer(ctx, cancel); err != nil { 241 panic(fmt.Errorf("failed to start consumer: %w", err)) 242 } 243 }(ctx, cancel) 244 245 go func(ctx context.Context) { 246 if err := p.plcScraper.Run(ctx); err != nil { 247 panic(fmt.Errorf("failed to start plc scraper: %w", err)) 248 } 249 }(ctx) 250 251 if withBackfill { 252 go func(ctx context.Context) { 253 if err := p.runBackfiller(ctx); err != nil { 254 panic(fmt.Errorf("error starting backfiller: %w", err)) 255 } 256 }(ctx) 257 } 258 259 <-ctx.Done() 260 261 if p.inserters != nil { 262 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) 263 264 p.logger.Info("stopping inserters") 265 266 if p.inserters.followsInserter != nil { 267 p.wg.Add(1) 268 go func() { 269 defer p.wg.Done() 270 if err := p.inserters.followsInserter.Close(ctx); err != nil { 271 p.logger.Error("failed to close follows inserter", "error", err) 272 return 273 } 274 p.logger.Info("follows inserter closed") 275 }() 276 } 277 278 if p.inserters.interactionsInserter != nil { 279 p.wg.Add(1) 280 go func() { 281 defer p.wg.Done() 282 if err := p.inserters.interactionsInserter.Close(ctx); err != nil { 283 p.logger.Error("failed to close interactions inserter", "error", err) 284 return 285 } 286 p.logger.Info("interactions inserter closed") 287 }() 288 } 289 290 if p.inserters.postsInserter != nil { 291 p.wg.Add(1) 292 go func() { 293 defer p.wg.Done() 294 if err := p.inserters.postsInserter.Close(ctx); err != nil { 295 p.logger.Error("failed to close posts inserter", "error", err) 296 return 297 } 298 p.logger.Info("posts inserter closed") 299 }() 300 } 301 302 if p.inserters.recordsInserter != nil { 303 p.wg.Add(1) 304 go func() { 305 defer p.wg.Done() 306 if err := p.inserters.recordsInserter.Close(ctx); err != nil { 307 p.logger.Error("failed to close records inserter", "error", err) 308 return 309 } 310 p.logger.Info("records inserter closed") 311 }() 312 } 313 314 if p.inserters.deletesInserter != nil { 315 p.wg.Add(1) 316 go func() { 317 defer p.wg.Done() 318 if err := p.inserters.deletesInserter.Close(ctx); err != nil { 319 p.logger.Error("failed to close deletes inserter", "error", err) 320 return 321 } 322 p.logger.Info("deletes inserter closed") 323 }() 324 } 325 326 if p.inserters.plcInserter != nil { 327 p.wg.Add(1) 328 go func() { 329 defer p.wg.Done() 330 if err := p.inserters.plcInserter.Close(ctx); err != nil { 331 p.logger.Error("failed to close plc inserter", "error", err) 332 return 333 } 334 p.logger.Info("plc inserter closed") 335 }() 336 } 337 338 p.wg.Wait() 339 340 cancel() 341 342 p.logger.Info("inserters closed") 343 } 344 345 return nil 346}