this repo has no description
1package photocopy 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "net/http" 8 "sync" 9 "time" 10 11 "github.com/ClickHouse/clickhouse-go/v2" 12 "github.com/haileyok/photocopy/clickhouse_inserter" 13 "github.com/prometheus/client_golang/prometheus" 14 "github.com/prometheus/client_golang/prometheus/promauto" 15 "github.com/prometheus/client_golang/prometheus/promhttp" 16) 17 18type Photocopy struct { 19 logger *slog.Logger 20 wg sync.WaitGroup 21 22 relayHost string 23 cursor string 24 cursorFile string 25 metricsAddr string 26 27 inserters *Inserters 28 29 plcScraper *PLCScraper 30} 31 32type Inserters struct { 33 followsInserter *clickhouse_inserter.Inserter 34 interactionsInserter *clickhouse_inserter.Inserter 35 postsInserter *clickhouse_inserter.Inserter 36 plcInserter *clickhouse_inserter.Inserter 37 recordsInserter *clickhouse_inserter.Inserter 38 deletesInserter *clickhouse_inserter.Inserter 39} 40 41type Args struct { 42 Logger *slog.Logger 43 RelayHost string 44 MetricsAddr string 45 CursorFile string 46 PLCScraperCursorFile string 47 ClickhouseAddr string 48 ClickhouseDatabase string 49 ClickhouseUser string 50 ClickhousePass string 51} 52 53func New(ctx context.Context, args *Args) (*Photocopy, error) { 54 p := &Photocopy{ 55 logger: args.Logger, 56 metricsAddr: args.MetricsAddr, 57 relayHost: args.RelayHost, 58 wg: sync.WaitGroup{}, 59 cursorFile: args.CursorFile, 60 } 61 62 conn, err := clickhouse.Open(&clickhouse.Options{ 63 Addr: []string{args.ClickhouseAddr}, 64 Auth: clickhouse.Auth{ 65 Database: args.ClickhouseDatabase, 66 Username: args.ClickhouseUser, 67 Password: args.ClickhousePass, 68 }, 69 }) 70 if err != nil { 71 return nil, err 72 } 73 74 insertionsHist := promauto.NewHistogramVec(prometheus.HistogramOpts{ 75 Name: "photocopy_inserts_time", 76 Help: "histogram of photocopy inserts", 77 Buckets: prometheus.ExponentialBucketsRange(0.0001, 30, 20), 78 }, []string{"type"}) 79 80 fi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 81 PrometheusCounterPrefix: "photocopy_follows", 82 Histogram: insertionsHist, 83 BatchSize: 1000, 84 Logger: p.logger, 85 Conn: conn, 86 Query: "INSERT INTO follow (uri, did, rkey, created_at, indexed_at, subject)", 87 }) 88 if err != nil { 89 return nil, err 90 } 91 92 pi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 93 PrometheusCounterPrefix: "photocopy_posts", 94 Histogram: insertionsHist, 95 BatchSize: 100, 96 Logger: p.logger, 97 Conn: conn, 98 Query: "INSERT INTO post (uri, did, rkey, created_at, indexed_at, root_uri, root_did, parent_uri, parent_did, quote_uri, quote_did)", 99 }) 100 if err != nil { 101 return nil, err 102 } 103 104 ii, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 105 PrometheusCounterPrefix: "photocopy_interactions", 106 Histogram: insertionsHist, 107 BatchSize: 1000, 108 Logger: p.logger, 109 Conn: conn, 110 Query: "INSERT INTO interaction (uri, did, rkey, kind, created_at, indexed_at, subject_uri, subject_did)", 111 }) 112 if err != nil { 113 return nil, err 114 } 115 116 ri, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 117 PrometheusCounterPrefix: "photocopy_records", 118 Histogram: insertionsHist, 119 BatchSize: 1000, 120 Logger: p.logger, 121 Conn: conn, 122 Query: "INSERT INTO records (did, rkey, collection, cid, seq, raw, created_at)", 123 }) 124 if err != nil { 125 return nil, err 126 } 127 128 di, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 129 PrometheusCounterPrefix: "photocopy_records", 130 Histogram: insertionsHist, 131 BatchSize: 1000, 132 Logger: p.logger, 133 Conn: conn, 134 Query: "INSERT INTO deletes (did, rkey, created_at)", 135 }) 136 if err != nil { 137 return nil, err 138 } 139 140 is := &Inserters{ 141 followsInserter: fi, 142 postsInserter: pi, 143 interactionsInserter: ii, 144 recordsInserter: ri, 145 deletesInserter: di, 146 } 147 148 p.inserters = is 149 150 plci, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 151 PrometheusCounterPrefix: "photocopy_plc_entries", 152 Histogram: insertionsHist, 153 BatchSize: 100, 154 Logger: args.Logger, 155 Conn: conn, 156 Query: `INSERT INTO plc ( 157 did, cid, nullified, created_at, plc_op_sig, plc_op_prev, plc_op_type, 158 plc_op_services, plc_op_also_known_as, plc_op_rotation_keys, 159 plc_tomb_sig, plc_tomb_prev, plc_tomb_type, 160 legacy_op_sig, legacy_op_prev, legacy_op_type, legacy_op_handle, 161 legacy_op_service, legacy_op_signing_key, legacy_op_recovery_key 162 )`, 163 }) 164 if err != nil { 165 return nil, err 166 } 167 168 plcs, err := NewPLCScraper(ctx, PLCScraperArgs{ 169 Logger: p.logger, 170 Inserter: plci, 171 CursorFile: args.PLCScraperCursorFile, 172 }) 173 if err != nil { 174 return nil, err 175 } 176 177 p.inserters.plcInserter = plci 178 p.plcScraper = plcs 179 180 return p, nil 181} 182 183func (p *Photocopy) Run(baseCtx context.Context) error { 184 ctx, cancel := context.WithCancel(baseCtx) 185 186 metricsServer := http.NewServeMux() 187 metricsServer.Handle("/metrics", promhttp.Handler()) 188 189 go func() { 190 p.logger.Info("Starting metrics server") 191 if err := http.ListenAndServe(p.metricsAddr, metricsServer); err != nil { 192 p.logger.Error("metrics server failed", "error", err) 193 } 194 }() 195 196 go func(ctx context.Context, cancel context.CancelFunc) { 197 p.logger.Info("starting relay", "relayHost", p.relayHost) 198 if err := p.startConsumer(ctx, cancel); err != nil { 199 panic(fmt.Errorf("failed to start consumer: %w", err)) 200 } 201 }(ctx, cancel) 202 203 go func(ctx context.Context) { 204 if err := p.plcScraper.Run(ctx); err != nil { 205 panic(fmt.Errorf("failed to start plc scraper: %w", err)) 206 } 207 }(ctx) 208 209 <-ctx.Done() 210 211 if p.inserters != nil { 212 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) 213 214 p.logger.Info("stopping inserters") 215 216 if p.inserters.followsInserter != nil { 217 p.wg.Add(1) 218 go func() { 219 defer p.wg.Done() 220 if err := p.inserters.followsInserter.Close(ctx); err != nil { 221 p.logger.Error("failed to close follows inserter", "error", err) 222 return 223 } 224 p.logger.Info("follows inserter closed") 225 }() 226 } 227 228 if p.inserters.plcInserter != nil { 229 p.wg.Add(1) 230 go func() { 231 defer p.wg.Done() 232 if err := p.inserters.plcInserter.Close(ctx); err != nil { 233 p.logger.Error("failed to close plc inserter", "error", err) 234 return 235 } 236 p.logger.Info("plc inserter closed") 237 }() 238 } 239 240 p.wg.Wait() 241 242 cancel() 243 244 p.logger.Info("inserters closed") 245 } 246 247 return nil 248}