this repo has no description
1package photocopy 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "net/http" 8 "sync" 9 "time" 10 11 "github.com/ClickHouse/clickhouse-go/v2" 12 "github.com/haileyok/photocopy/clickhouse_inserter" 13 "github.com/prometheus/client_golang/prometheus" 14 "github.com/prometheus/client_golang/prometheus/promauto" 15 "github.com/prometheus/client_golang/prometheus/promhttp" 16) 17 18type Photocopy struct { 19 logger *slog.Logger 20 wg sync.WaitGroup 21 22 relayHost string 23 cursor string 24 cursorFile string 25 metricsAddr string 26 27 inserters *Inserters 28 29 plcScraper *PLCScraper 30} 31 32type Inserters struct { 33 followsInserter *clickhouse_inserter.Inserter 34 interactionsInserter *clickhouse_inserter.Inserter 35 postsInserter *clickhouse_inserter.Inserter 36 plcInserter *clickhouse_inserter.Inserter 37} 38 39type Args struct { 40 Logger *slog.Logger 41 RelayHost string 42 MetricsAddr string 43 CursorFile string 44 PLCScraperCursorFile string 45 ClickhouseAddr string 46 ClickhouseDatabase string 47 ClickhouseUser string 48 ClickhousePass string 49} 50 51func New(ctx context.Context, args *Args) (*Photocopy, error) { 52 p := &Photocopy{ 53 logger: args.Logger, 54 metricsAddr: args.MetricsAddr, 55 relayHost: args.RelayHost, 56 wg: sync.WaitGroup{}, 57 cursorFile: args.CursorFile, 58 } 59 60 conn, err := clickhouse.Open(&clickhouse.Options{ 61 Addr: []string{args.ClickhouseAddr}, 62 Auth: clickhouse.Auth{ 63 Database: args.ClickhouseDatabase, 64 Username: args.ClickhouseUser, 65 Password: args.ClickhousePass, 66 }, 67 }) 68 if err != nil { 69 return nil, err 70 } 71 72 insertionsHist := promauto.NewHistogramVec(prometheus.HistogramOpts{ 73 Name: "photocopy_inserts_time", 74 Help: "histogram of photocopy inserts", 75 Buckets: prometheus.ExponentialBucketsRange(0.0001, 30, 20), 76 }, []string{"type"}) 77 78 fi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 79 PrometheusCounterPrefix: "photocopy_follows", 80 Histogram: insertionsHist, 81 BatchSize: 1000, 82 Logger: p.logger, 83 Conn: conn, 84 Query: "INSERT INTO follow (uri, did, rkey, created_at, indexed_at, subject)", 85 }) 86 if err != nil { 87 return nil, err 88 } 89 90 pi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 91 PrometheusCounterPrefix: "photocopy_posts", 92 Histogram: insertionsHist, 93 BatchSize: 100, 94 Logger: p.logger, 95 Conn: conn, 96 Query: "INSERT INTO post (uri, did, rkey, created_at, indexed_at, root_uri, root_did, parent_uri, parent_did, quote_uri, quote_did)", 97 }) 98 if err != nil { 99 return nil, err 100 } 101 102 ii, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 103 PrometheusCounterPrefix: "photocopy_interactions", 104 Histogram: insertionsHist, 105 BatchSize: 1000, 106 Logger: p.logger, 107 Conn: conn, 108 Query: "INSERT INTO interaction (uri, did, rkey, kind, created_at, indexed_at, subject_uri, subject_did)", 109 }) 110 if err != nil { 111 return nil, err 112 } 113 114 is := &Inserters{ 115 followsInserter: fi, 116 postsInserter: pi, 117 interactionsInserter: ii, 118 } 119 120 p.inserters = is 121 122 plci, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 123 PrometheusCounterPrefix: "photocopy_plc_entries", 124 Histogram: insertionsHist, 125 BatchSize: 100, 126 Logger: args.Logger, 127 Conn: conn, 128 Query: `INSERT INTO plc ( 129 did, cid, nullified, created_at, plc_op_sig, plc_op_prev, plc_op_type, 130 plc_op_services, plc_op_also_known_as, plc_op_rotation_keys, 131 plc_tomb_sig, plc_tomb_prev, plc_tomb_type, 132 legacy_op_sig, legacy_op_prev, legacy_op_type, legacy_op_handle, 133 legacy_op_service, legacy_op_signing_key, legacy_op_recovery_key 134 )`, 135 }) 136 if err != nil { 137 return nil, err 138 } 139 140 plcs, err := NewPLCScraper(ctx, PLCScraperArgs{ 141 Logger: p.logger, 142 Inserter: plci, 143 CursorFile: args.PLCScraperCursorFile, 144 }) 145 if err != nil { 146 return nil, err 147 } 148 149 p.inserters.plcInserter = plci 150 p.plcScraper = plcs 151 152 return p, nil 153} 154 155func (p *Photocopy) Run(baseCtx context.Context) error { 156 ctx, cancel := context.WithCancel(baseCtx) 157 158 metricsServer := http.NewServeMux() 159 metricsServer.Handle("/metrics", promhttp.Handler()) 160 161 go func() { 162 p.logger.Info("Starting metrics server") 163 if err := http.ListenAndServe(p.metricsAddr, metricsServer); err != nil { 164 p.logger.Error("metrics server failed", "error", err) 165 } 166 }() 167 168 go func(ctx context.Context, cancel context.CancelFunc) { 169 p.logger.Info("starting relay", "relayHost", p.relayHost) 170 if err := p.startConsumer(ctx, cancel); err != nil { 171 panic(fmt.Errorf("failed to start consumer: %w", err)) 172 } 173 }(ctx, cancel) 174 175 go func(ctx context.Context) { 176 if err := p.plcScraper.Run(ctx); err != nil { 177 panic(fmt.Errorf("failed to start plc scraper: %w", err)) 178 } 179 }(ctx) 180 181 <-ctx.Done() 182 183 if p.inserters != nil { 184 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) 185 186 p.logger.Info("stopping inserters") 187 188 if p.inserters.followsInserter != nil { 189 p.wg.Add(1) 190 go func() { 191 defer p.wg.Done() 192 if err := p.inserters.followsInserter.Close(ctx); err != nil { 193 p.logger.Error("failed to close follows inserter", "error", err) 194 return 195 } 196 p.logger.Info("follows inserter closed") 197 }() 198 } 199 200 if p.inserters.plcInserter != nil { 201 p.wg.Add(1) 202 go func() { 203 defer p.wg.Done() 204 if err := p.inserters.plcInserter.Close(ctx); err != nil { 205 p.logger.Error("failed to close plc inserter", "error", err) 206 return 207 } 208 p.logger.Info("plc inserter closed") 209 }() 210 } 211 212 p.wg.Wait() 213 214 cancel() 215 216 p.logger.Info("inserters closed") 217 } 218 219 return nil 220}