this repo has no description
at main 9.4 kB view raw
1package photocopy 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "net/http" 8 "sync" 9 "time" 10 11 "github.com/ClickHouse/clickhouse-go/v2" 12 "github.com/ClickHouse/clickhouse-go/v2/lib/driver" 13 "github.com/haileyok/photocopy/clickhouse_inserter" 14 "github.com/haileyok/photocopy/nervana" 15 "github.com/prometheus/client_golang/prometheus" 16 "github.com/prometheus/client_golang/prometheus/promauto" 17 "github.com/prometheus/client_golang/prometheus/promhttp" 18) 19 20type Photocopy struct { 21 logger *slog.Logger 22 wg sync.WaitGroup 23 24 relayHost string 25 cursor string 26 cursorFile string 27 metricsAddr string 28 29 inserters *Inserters 30 31 plcScraper *PLCScraper 32 33 ratelimitBypassKey string 34 35 conn driver.Conn 36 37 nervanaClient *nervana.Client 38 nervanaEndpoint string 39 nervanaApiKey string 40} 41 42type Inserters struct { 43 followsInserter *clickhouse_inserter.Inserter 44 interactionsInserter *clickhouse_inserter.Inserter 45 postsInserter *clickhouse_inserter.Inserter 46 plcInserter *clickhouse_inserter.Inserter 47 recordsInserter *clickhouse_inserter.Inserter 48 deletesInserter *clickhouse_inserter.Inserter 49 labelsInserter *clickhouse_inserter.Inserter 50} 51 52type Args struct { 53 Logger *slog.Logger 54 RelayHost string 55 MetricsAddr string 56 CursorFile string 57 PLCScraperCursorFile string 58 ClickhouseAddr string 59 ClickhouseDatabase string 60 ClickhouseUser string 61 ClickhousePass string 62 RatelimitBypassKey string 63 NervanaEndpoint string 64 NervanaApiKey string 65} 66 67func New(ctx context.Context, args *Args) (*Photocopy, error) { 68 conn, err := clickhouse.Open(&clickhouse.Options{ 69 Addr: []string{args.ClickhouseAddr}, 70 Auth: clickhouse.Auth{ 71 Database: args.ClickhouseDatabase, 72 Username: args.ClickhouseUser, 73 Password: args.ClickhousePass, 74 }, 75 }) 76 if err != nil { 77 return nil, err 78 } 79 80 p := &Photocopy{ 81 logger: args.Logger, 82 metricsAddr: args.MetricsAddr, 83 relayHost: args.RelayHost, 84 wg: sync.WaitGroup{}, 85 cursorFile: args.CursorFile, 86 ratelimitBypassKey: args.RatelimitBypassKey, 87 conn: conn, 88 } 89 90 insertionsHist := promauto.NewHistogramVec(prometheus.HistogramOpts{ 91 Name: "photocopy_inserts_time", 92 Help: "histogram of photocopy inserts", 93 Buckets: prometheus.ExponentialBucketsRange(0.0001, 30, 20), 94 }, []string{"type"}) 95 96 fi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 97 PrometheusCounterPrefix: "photocopy_follows", 98 Histogram: insertionsHist, 99 BatchSize: 500, 100 Logger: p.logger, 101 Conn: conn, 102 Query: "INSERT INTO follow (uri, did, rkey, created_at, indexed_at, subject)", 103 RateLimit: 3, 104 }) 105 if err != nil { 106 return nil, err 107 } 108 109 pi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 110 PrometheusCounterPrefix: "photocopy_posts", 111 Histogram: insertionsHist, 112 BatchSize: 300, 113 Logger: p.logger, 114 Conn: conn, 115 Query: "INSERT INTO post (uri, did, rkey, created_at, indexed_at, root_uri, root_did, parent_uri, parent_did, quote_uri, quote_did, lang)", 116 RateLimit: 3, 117 }) 118 if err != nil { 119 return nil, err 120 } 121 122 ii, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 123 PrometheusCounterPrefix: "photocopy_interactions", 124 Histogram: insertionsHist, 125 BatchSize: 1000, 126 Logger: p.logger, 127 Conn: conn, 128 Query: "INSERT INTO interaction (uri, did, rkey, kind, created_at, indexed_at, subject_uri, subject_did)", 129 RateLimit: 3, 130 }) 131 if err != nil { 132 return nil, err 133 } 134 135 ri, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 136 PrometheusCounterPrefix: "photocopy_records", 137 Histogram: insertionsHist, 138 BatchSize: 2500, 139 Logger: p.logger, 140 Conn: conn, 141 Query: "INSERT INTO record (did, rkey, collection, cid, seq, raw, created_at)", 142 RateLimit: 3, 143 }) 144 if err != nil { 145 return nil, err 146 } 147 148 di, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 149 PrometheusCounterPrefix: "photocopy_deletes", 150 Histogram: insertionsHist, 151 BatchSize: 500, 152 Logger: p.logger, 153 Conn: conn, 154 Query: "INSERT INTO delete (did, rkey, created_at)", 155 RateLimit: 3, 156 }) 157 if err != nil { 158 return nil, err 159 } 160 161 li, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 162 PrometheusCounterPrefix: "photocopy_labels", 163 Histogram: insertionsHist, 164 BatchSize: 100, 165 Logger: p.logger, 166 Conn: conn, 167 Query: "INSERT INTO post_label (did, rkey, text, label, entity_id, description, topic, created_at)", 168 RateLimit: 3, 169 }) 170 if err != nil { 171 return nil, err 172 } 173 174 is := &Inserters{ 175 followsInserter: fi, 176 postsInserter: pi, 177 interactionsInserter: ii, 178 recordsInserter: ri, 179 deletesInserter: di, 180 labelsInserter: li, 181 } 182 183 p.inserters = is 184 185 plci, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 186 PrometheusCounterPrefix: "photocopy_plc_entries", 187 Histogram: insertionsHist, 188 BatchSize: 100, 189 Logger: args.Logger, 190 Conn: conn, 191 Query: `INSERT INTO plc ( 192 did, cid, nullified, created_at, plc_op_sig, plc_op_prev, plc_op_type, 193 plc_op_services, plc_op_also_known_as, plc_op_rotation_keys, 194 plc_tomb_sig, plc_tomb_prev, plc_tomb_type, 195 legacy_op_sig, legacy_op_prev, legacy_op_type, legacy_op_handle, 196 legacy_op_service, legacy_op_signing_key, legacy_op_recovery_key 197 )`, 198 }) 199 if err != nil { 200 return nil, err 201 } 202 203 plcs, err := NewPLCScraper(ctx, PLCScraperArgs{ 204 Logger: p.logger, 205 Inserter: plci, 206 CursorFile: args.PLCScraperCursorFile, 207 }) 208 if err != nil { 209 return nil, err 210 } 211 212 p.inserters.plcInserter = plci 213 p.plcScraper = plcs 214 215 if args.NervanaApiKey != "" && args.NervanaEndpoint != "" { 216 p.nervanaClient = nervana.NewClient(args.NervanaEndpoint, args.NervanaApiKey) 217 } 218 219 return p, nil 220} 221 222func (p *Photocopy) Run(baseCtx context.Context, withBackfill bool) error { 223 ctx, cancel := context.WithCancel(baseCtx) 224 225 metricsServer := http.NewServeMux() 226 metricsServer.Handle("/metrics", promhttp.Handler()) 227 228 go func() { 229 p.logger.Info("Starting metrics server") 230 if err := http.ListenAndServe(p.metricsAddr, metricsServer); err != nil { 231 p.logger.Error("metrics server failed", "error", err) 232 } 233 }() 234 235 go func(ctx context.Context, cancel context.CancelFunc) { 236 p.logger.Info("starting relay", "relayHost", p.relayHost) 237 if err := p.startConsumer(ctx, cancel); err != nil { 238 panic(fmt.Errorf("failed to start consumer: %w", err)) 239 } 240 }(ctx, cancel) 241 242 go func(ctx context.Context) { 243 if err := p.plcScraper.Run(ctx); err != nil { 244 panic(fmt.Errorf("failed to start plc scraper: %w", err)) 245 } 246 }(ctx) 247 248 if withBackfill { 249 go func(ctx context.Context) { 250 if err := p.runBackfiller(ctx); err != nil { 251 panic(fmt.Errorf("error starting backfiller: %w", err)) 252 } 253 }(ctx) 254 } 255 256 <-ctx.Done() 257 258 if p.inserters != nil { 259 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) 260 261 p.logger.Info("stopping inserters") 262 263 if p.inserters.followsInserter != nil { 264 p.wg.Add(1) 265 go func() { 266 defer p.wg.Done() 267 if err := p.inserters.followsInserter.Close(ctx); err != nil { 268 p.logger.Error("failed to close follows inserter", "error", err) 269 return 270 } 271 p.logger.Info("follows inserter closed") 272 }() 273 } 274 275 if p.inserters.interactionsInserter != nil { 276 p.wg.Add(1) 277 go func() { 278 defer p.wg.Done() 279 if err := p.inserters.interactionsInserter.Close(ctx); err != nil { 280 p.logger.Error("failed to close interactions inserter", "error", err) 281 return 282 } 283 p.logger.Info("interactions inserter closed") 284 }() 285 } 286 287 if p.inserters.postsInserter != nil { 288 p.wg.Add(1) 289 go func() { 290 defer p.wg.Done() 291 if err := p.inserters.postsInserter.Close(ctx); err != nil { 292 p.logger.Error("failed to close posts inserter", "error", err) 293 return 294 } 295 p.logger.Info("posts inserter closed") 296 }() 297 } 298 299 if p.inserters.recordsInserter != nil { 300 p.wg.Add(1) 301 go func() { 302 defer p.wg.Done() 303 if err := p.inserters.recordsInserter.Close(ctx); err != nil { 304 p.logger.Error("failed to close records inserter", "error", err) 305 return 306 } 307 p.logger.Info("records inserter closed") 308 }() 309 } 310 311 if p.inserters.deletesInserter != nil { 312 p.wg.Add(1) 313 go func() { 314 defer p.wg.Done() 315 if err := p.inserters.deletesInserter.Close(ctx); err != nil { 316 p.logger.Error("failed to close deletes inserter", "error", err) 317 return 318 } 319 p.logger.Info("deletes inserter closed") 320 }() 321 } 322 323 if p.inserters.plcInserter != nil { 324 p.wg.Add(1) 325 go func() { 326 defer p.wg.Done() 327 if err := p.inserters.plcInserter.Close(ctx); err != nil { 328 p.logger.Error("failed to close plc inserter", "error", err) 329 return 330 } 331 p.logger.Info("plc inserter closed") 332 }() 333 } 334 335 p.wg.Wait() 336 337 cancel() 338 339 p.logger.Info("inserters closed") 340 } 341 342 return nil 343}