this repo has no description
1package photocopy 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "net/http" 8 "sync" 9 "time" 10 11 inserter "github.com/haileyok/photocopy/internal" 12 "github.com/prometheus/client_golang/prometheus" 13 "github.com/prometheus/client_golang/prometheus/promauto" 14 "github.com/prometheus/client_golang/prometheus/promhttp" 15) 16 17type Photocopy struct { 18 logger *slog.Logger 19 wg sync.WaitGroup 20 21 relayHost string 22 cursor string 23 cursorFile string 24 metricsAddr string 25 26 inserters *Inserters 27 28 plcScraper *PLCScraper 29} 30 31type Inserters struct { 32 followsInserter *inserter.Inserter 33 plcInserter *inserter.Inserter 34} 35 36type Args struct { 37 Logger *slog.Logger 38 RelayHost string 39 MetricsAddr string 40 CursorFile string 41 PLCScraperCursorFile string 42 ClickhouseUser string 43 ClickhousePass string 44} 45 46func New(ctx context.Context, args *Args) (*Photocopy, error) { 47 p := &Photocopy{ 48 logger: args.Logger, 49 metricsAddr: args.MetricsAddr, 50 relayHost: args.RelayHost, 51 wg: sync.WaitGroup{}, 52 cursorFile: args.CursorFile, 53 } 54 55 insertionsHist := promauto.NewHistogramVec(prometheus.HistogramOpts{ 56 Name: "photocopy_inserts_time", 57 Help: "histogram of photocopy inserts", 58 Buckets: prometheus.ExponentialBucketsRange(0.0001, 30, 20), 59 }, []string{"type"}) 60 61 fi, err := inserter.New(ctx, &inserter.Args{ 62 PrometheusCounterPrefix: "photocopy_follows", 63 Histogram: insertionsHist, 64 BatchSize: 100, 65 Logger: p.logger, 66 }) 67 if err != nil { 68 return nil, err 69 } 70 71 is := &Inserters{ 72 followsInserter: fi, 73 } 74 75 p.inserters = is 76 77 plci, err := inserter.New(ctx, &inserter.Args{ 78 PrometheusCounterPrefix: "photocopy_plc_entries", 79 Histogram: insertionsHist, 80 BatchSize: 100, 81 Logger: args.Logger, 82 }) 83 if err != nil { 84 return nil, err 85 } 86 87 plcs, err := NewPLCScraper(ctx, PLCScraperArgs{ 88 Logger: p.logger, 89 Inserter: plci, 90 CursorFile: args.PLCScraperCursorFile, 91 }) 92 if err != nil { 93 return nil, err 94 } 95 96 p.inserters.plcInserter = plci 97 p.plcScraper = plcs 98 99 return p, nil 100} 101 102func (p *Photocopy) Run(baseCtx context.Context) error { 103 ctx, cancel := context.WithCancel(baseCtx) 104 105 metricsServer := http.NewServeMux() 106 metricsServer.Handle("/metrics", promhttp.Handler()) 107 108 go func() { 109 p.logger.Info("Starting metrics server") 110 if err := http.ListenAndServe(p.metricsAddr, metricsServer); err != nil { 111 p.logger.Error("metrics server failed", "error", err) 112 } 113 }() 114 115 go func(ctx context.Context, cancel context.CancelFunc) { 116 p.logger.Info("starting relay", "relayHost", p.relayHost) 117 if err := p.startConsumer(ctx, cancel); err != nil { 118 panic(fmt.Errorf("failed to start consumer: %w", err)) 119 } 120 }(ctx, cancel) 121 122 go func(ctx context.Context) { 123 if err := p.plcScraper.Run(ctx); err != nil { 124 panic(fmt.Errorf("failed to start plc scraper: %w", err)) 125 } 126 }(ctx) 127 128 <-ctx.Done() 129 130 if p.inserters != nil { 131 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) 132 133 p.logger.Info("stopping inserters") 134 135 if p.inserters.followsInserter != nil { 136 p.wg.Add(1) 137 go func() { 138 defer p.wg.Done() 139 if err := p.inserters.followsInserter.Close(ctx); err != nil { 140 p.logger.Error("failed to close follows inserter", "error", err) 141 return 142 } 143 p.logger.Info("follows inserter closed") 144 }() 145 } 146 147 if p.inserters.plcInserter != nil { 148 p.wg.Add(1) 149 go func() { 150 defer p.wg.Done() 151 if err := p.inserters.plcInserter.Close(ctx); err != nil { 152 p.logger.Error("failed to close plc inserter", "error", err) 153 return 154 } 155 p.logger.Info("plc inserter closed") 156 }() 157 } 158 159 p.wg.Wait() 160 161 cancel() 162 163 p.logger.Info("inserters closed") 164 } 165 166 return nil 167}