this repo has no description
1package photocopy 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "net/http" 8 "sync" 9 "time" 10 11 "github.com/ClickHouse/clickhouse-go/v2" 12 "github.com/ClickHouse/clickhouse-go/v2/lib/driver" 13 "github.com/haileyok/photocopy/clickhouse_inserter" 14 "github.com/prometheus/client_golang/prometheus" 15 "github.com/prometheus/client_golang/prometheus/promauto" 16 "github.com/prometheus/client_golang/prometheus/promhttp" 17) 18 19type Photocopy struct { 20 logger *slog.Logger 21 wg sync.WaitGroup 22 23 relayHost string 24 cursor string 25 cursorFile string 26 metricsAddr string 27 28 inserters *Inserters 29 30 plcScraper *PLCScraper 31 32 ratelimitBypassKey string 33 34 conn driver.Conn 35} 36 37type Inserters struct { 38 followsInserter *clickhouse_inserter.Inserter 39 interactionsInserter *clickhouse_inserter.Inserter 40 postsInserter *clickhouse_inserter.Inserter 41 plcInserter *clickhouse_inserter.Inserter 42 recordsInserter *clickhouse_inserter.Inserter 43 deletesInserter *clickhouse_inserter.Inserter 44} 45 46type Args struct { 47 Logger *slog.Logger 48 RelayHost string 49 MetricsAddr string 50 CursorFile string 51 PLCScraperCursorFile string 52 ClickhouseAddr string 53 ClickhouseDatabase string 54 ClickhouseUser string 55 ClickhousePass string 56 RatelimitBypassKey string 57} 58 59func New(ctx context.Context, args *Args) (*Photocopy, error) { 60 conn, err := clickhouse.Open(&clickhouse.Options{ 61 Addr: []string{args.ClickhouseAddr}, 62 Auth: clickhouse.Auth{ 63 Database: args.ClickhouseDatabase, 64 Username: args.ClickhouseUser, 65 Password: args.ClickhousePass, 66 }, 67 }) 68 if err != nil { 69 return nil, err 70 } 71 72 p := &Photocopy{ 73 logger: args.Logger, 74 metricsAddr: args.MetricsAddr, 75 relayHost: args.RelayHost, 76 wg: sync.WaitGroup{}, 77 cursorFile: args.CursorFile, 78 ratelimitBypassKey: args.RatelimitBypassKey, 79 conn: conn, 80 } 81 82 insertionsHist := promauto.NewHistogramVec(prometheus.HistogramOpts{ 83 Name: "photocopy_inserts_time", 84 Help: "histogram of photocopy inserts", 85 Buckets: prometheus.ExponentialBucketsRange(0.0001, 30, 20), 86 }, []string{"type"}) 87 88 fi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 89 PrometheusCounterPrefix: "photocopy_follows", 90 Histogram: insertionsHist, 91 BatchSize: 250_000, 92 Logger: p.logger, 93 Conn: conn, 94 Query: "INSERT INTO follow (uri, did, rkey, created_at, indexed_at, subject)", 95 RateLimit: 3, 96 }) 97 if err != nil { 98 return nil, err 99 } 100 101 pi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 102 PrometheusCounterPrefix: "photocopy_posts", 103 Histogram: insertionsHist, 104 BatchSize: 250_000, 105 Logger: p.logger, 106 Conn: conn, 107 Query: "INSERT INTO post (uri, did, rkey, created_at, indexed_at, root_uri, root_did, parent_uri, parent_did, quote_uri, quote_did, lang)", 108 RateLimit: 3, 109 }) 110 if err != nil { 111 return nil, err 112 } 113 114 ii, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 115 PrometheusCounterPrefix: "photocopy_interactions", 116 Histogram: insertionsHist, 117 BatchSize: 250_000, 118 Logger: p.logger, 119 Conn: conn, 120 Query: "INSERT INTO interaction (uri, did, rkey, kind, created_at, indexed_at, subject_uri, subject_did)", 121 RateLimit: 3, 122 }) 123 if err != nil { 124 return nil, err 125 } 126 127 ri, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 128 PrometheusCounterPrefix: "photocopy_records", 129 Histogram: insertionsHist, 130 BatchSize: 250_000, 131 Logger: p.logger, 132 Conn: conn, 133 Query: "INSERT INTO record (did, rkey, collection, cid, seq, raw, created_at)", 134 RateLimit: 3, 135 }) 136 if err != nil { 137 return nil, err 138 } 139 140 di, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 141 PrometheusCounterPrefix: "photocopy_deletes", 142 Histogram: insertionsHist, 143 BatchSize: 250_000, 144 Logger: p.logger, 145 Conn: conn, 146 Query: "INSERT INTO delete (did, rkey, created_at)", 147 RateLimit: 3, 148 }) 149 if err != nil { 150 return nil, err 151 } 152 153 is := &Inserters{ 154 followsInserter: fi, 155 postsInserter: pi, 156 interactionsInserter: ii, 157 recordsInserter: ri, 158 deletesInserter: di, 159 } 160 161 p.inserters = is 162 163 plci, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 164 PrometheusCounterPrefix: "photocopy_plc_entries", 165 Histogram: insertionsHist, 166 BatchSize: 100, 167 Logger: args.Logger, 168 Conn: conn, 169 Query: `INSERT INTO plc ( 170 did, cid, nullified, created_at, plc_op_sig, plc_op_prev, plc_op_type, 171 plc_op_services, plc_op_also_known_as, plc_op_rotation_keys, 172 plc_tomb_sig, plc_tomb_prev, plc_tomb_type, 173 legacy_op_sig, legacy_op_prev, legacy_op_type, legacy_op_handle, 174 legacy_op_service, legacy_op_signing_key, legacy_op_recovery_key 175 )`, 176 }) 177 if err != nil { 178 return nil, err 179 } 180 181 plcs, err := NewPLCScraper(ctx, PLCScraperArgs{ 182 Logger: p.logger, 183 Inserter: plci, 184 CursorFile: args.PLCScraperCursorFile, 185 }) 186 if err != nil { 187 return nil, err 188 } 189 190 p.inserters.plcInserter = plci 191 p.plcScraper = plcs 192 193 return p, nil 194} 195 196func (p *Photocopy) Run(baseCtx context.Context, withBackfill bool) error { 197 ctx, cancel := context.WithCancel(baseCtx) 198 199 metricsServer := http.NewServeMux() 200 metricsServer.Handle("/metrics", promhttp.Handler()) 201 202 go func() { 203 p.logger.Info("Starting metrics server") 204 if err := http.ListenAndServe(p.metricsAddr, metricsServer); err != nil { 205 p.logger.Error("metrics server failed", "error", err) 206 } 207 }() 208 209 go func(ctx context.Context, cancel context.CancelFunc) { 210 p.logger.Info("starting relay", "relayHost", p.relayHost) 211 if err := p.startConsumer(ctx, cancel); err != nil { 212 panic(fmt.Errorf("failed to start consumer: %w", err)) 213 } 214 }(ctx, cancel) 215 216 go func(ctx context.Context) { 217 if err := p.plcScraper.Run(ctx); err != nil { 218 panic(fmt.Errorf("failed to start plc scraper: %w", err)) 219 } 220 }(ctx) 221 222 if withBackfill { 223 go func(ctx context.Context) { 224 if err := p.runBackfiller(ctx); err != nil { 225 panic(fmt.Errorf("error starting backfiller: %w", err)) 226 } 227 }(ctx) 228 } 229 230 <-ctx.Done() 231 232 if p.inserters != nil { 233 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) 234 235 p.logger.Info("stopping inserters") 236 237 if p.inserters.followsInserter != nil { 238 p.wg.Add(1) 239 go func() { 240 defer p.wg.Done() 241 if err := p.inserters.followsInserter.Close(ctx); err != nil { 242 p.logger.Error("failed to close follows inserter", "error", err) 243 return 244 } 245 p.logger.Info("follows inserter closed") 246 }() 247 } 248 249 if p.inserters.interactionsInserter != nil { 250 p.wg.Add(1) 251 go func() { 252 defer p.wg.Done() 253 if err := p.inserters.interactionsInserter.Close(ctx); err != nil { 254 p.logger.Error("failed to close interactions inserter", "error", err) 255 return 256 } 257 p.logger.Info("interactions inserter closed") 258 }() 259 } 260 261 if p.inserters.postsInserter != nil { 262 p.wg.Add(1) 263 go func() { 264 defer p.wg.Done() 265 if err := p.inserters.postsInserter.Close(ctx); err != nil { 266 p.logger.Error("failed to close posts inserter", "error", err) 267 return 268 } 269 p.logger.Info("posts inserter closed") 270 }() 271 } 272 273 if p.inserters.recordsInserter != nil { 274 p.wg.Add(1) 275 go func() { 276 defer p.wg.Done() 277 if err := p.inserters.recordsInserter.Close(ctx); err != nil { 278 p.logger.Error("failed to close records inserter", "error", err) 279 return 280 } 281 p.logger.Info("records inserter closed") 282 }() 283 } 284 285 if p.inserters.deletesInserter != nil { 286 p.wg.Add(1) 287 go func() { 288 defer p.wg.Done() 289 if err := p.inserters.deletesInserter.Close(ctx); err != nil { 290 p.logger.Error("failed to close deletes inserter", "error", err) 291 return 292 } 293 p.logger.Info("deletes inserter closed") 294 }() 295 } 296 297 if p.inserters.plcInserter != nil { 298 p.wg.Add(1) 299 go func() { 300 defer p.wg.Done() 301 if err := p.inserters.plcInserter.Close(ctx); err != nil { 302 p.logger.Error("failed to close plc inserter", "error", err) 303 return 304 } 305 p.logger.Info("plc inserter closed") 306 }() 307 } 308 309 p.wg.Wait() 310 311 cancel() 312 313 p.logger.Info("inserters closed") 314 } 315 316 return nil 317}