this repo has no description
1package photocopy 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "net/http" 8 "sync" 9 "time" 10 11 "github.com/ClickHouse/clickhouse-go/v2" 12 "github.com/haileyok/photocopy/clickhouse_inserter" 13 "github.com/prometheus/client_golang/prometheus" 14 "github.com/prometheus/client_golang/prometheus/promauto" 15 "github.com/prometheus/client_golang/prometheus/promhttp" 16) 17 18type Photocopy struct { 19 logger *slog.Logger 20 wg sync.WaitGroup 21 22 relayHost string 23 cursor string 24 cursorFile string 25 metricsAddr string 26 27 inserters *Inserters 28 29 plcScraper *PLCScraper 30} 31 32type Inserters struct { 33 followsInserter *clickhouse_inserter.Inserter 34 plcInserter *clickhouse_inserter.Inserter 35} 36 37type Args struct { 38 Logger *slog.Logger 39 RelayHost string 40 MetricsAddr string 41 CursorFile string 42 PLCScraperCursorFile string 43 ClickhouseAddr string 44 ClickhouseDatabase string 45 ClickhouseUser string 46 ClickhousePass string 47} 48 49func New(ctx context.Context, args *Args) (*Photocopy, error) { 50 p := &Photocopy{ 51 logger: args.Logger, 52 metricsAddr: args.MetricsAddr, 53 relayHost: args.RelayHost, 54 wg: sync.WaitGroup{}, 55 cursorFile: args.CursorFile, 56 } 57 58 conn, err := clickhouse.Open(&clickhouse.Options{ 59 Addr: []string{args.ClickhouseAddr}, 60 Auth: clickhouse.Auth{ 61 Database: args.ClickhouseDatabase, 62 Username: args.ClickhouseUser, 63 Password: args.ClickhousePass, 64 }, 65 }) 66 if err != nil { 67 return nil, err 68 } 69 70 insertionsHist := promauto.NewHistogramVec(prometheus.HistogramOpts{ 71 Name: "photocopy_inserts_time", 72 Help: "histogram of photocopy inserts", 73 Buckets: prometheus.ExponentialBucketsRange(0.0001, 30, 20), 74 }, []string{"type"}) 75 76 fi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 77 PrometheusCounterPrefix: "photocopy_follows", 78 Histogram: insertionsHist, 79 BatchSize: 100, 80 Logger: p.logger, 81 Conn: conn, 82 Query: "", 83 }) 84 if err != nil { 85 return nil, err 86 } 87 88 is := &Inserters{ 89 followsInserter: fi, 90 } 91 92 p.inserters = is 93 94 plci, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 95 PrometheusCounterPrefix: "photocopy_plc_entries", 96 Histogram: insertionsHist, 97 BatchSize: 100, 98 Logger: args.Logger, 99 Conn: conn, 100 Query: `INSERT INTO your_table_name ( 101 did, cid, nullified, created_at, plc_op_sig, plc_op_prev, plc_op_type, 102 plc_op_services, plc_op_also_known_as, plc_op_rotation_keys, 103 plc_op_verification_methods, plc_tomb_sig, plc_tomb_prev, plc_tomb_type, 104 legacy_op_sig, legacy_op_prev, legacy_op_type, legacy_op_handle, 105 legacy_op_service, legacy_op_signing_key, legacy_op_recovery_key 106 )`, 107 }) 108 if err != nil { 109 return nil, err 110 } 111 112 plcs, err := NewPLCScraper(ctx, PLCScraperArgs{ 113 Logger: p.logger, 114 Inserter: plci, 115 CursorFile: args.PLCScraperCursorFile, 116 }) 117 if err != nil { 118 return nil, err 119 } 120 121 p.inserters.plcInserter = plci 122 p.plcScraper = plcs 123 124 return p, nil 125} 126 127func (p *Photocopy) Run(baseCtx context.Context) error { 128 ctx, cancel := context.WithCancel(baseCtx) 129 130 metricsServer := http.NewServeMux() 131 metricsServer.Handle("/metrics", promhttp.Handler()) 132 133 go func() { 134 p.logger.Info("Starting metrics server") 135 if err := http.ListenAndServe(p.metricsAddr, metricsServer); err != nil { 136 p.logger.Error("metrics server failed", "error", err) 137 } 138 }() 139 140 go func(ctx context.Context, cancel context.CancelFunc) { 141 p.logger.Info("starting relay", "relayHost", p.relayHost) 142 if err := p.startConsumer(ctx, cancel); err != nil { 143 panic(fmt.Errorf("failed to start consumer: %w", err)) 144 } 145 }(ctx, cancel) 146 147 go func(ctx context.Context) { 148 if err := p.plcScraper.Run(ctx); err != nil { 149 panic(fmt.Errorf("failed to start plc scraper: %w", err)) 150 } 151 }(ctx) 152 153 <-ctx.Done() 154 155 if p.inserters != nil { 156 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) 157 158 p.logger.Info("stopping inserters") 159 160 if p.inserters.followsInserter != nil { 161 p.wg.Add(1) 162 go func() { 163 defer p.wg.Done() 164 if err := p.inserters.followsInserter.Close(ctx); err != nil { 165 p.logger.Error("failed to close follows inserter", "error", err) 166 return 167 } 168 p.logger.Info("follows inserter closed") 169 }() 170 } 171 172 if p.inserters.plcInserter != nil { 173 p.wg.Add(1) 174 go func() { 175 defer p.wg.Done() 176 if err := p.inserters.plcInserter.Close(ctx); err != nil { 177 p.logger.Error("failed to close plc inserter", "error", err) 178 return 179 } 180 p.logger.Info("plc inserter closed") 181 }() 182 } 183 184 p.wg.Wait() 185 186 cancel() 187 188 p.logger.Info("inserters closed") 189 } 190 191 return nil 192}