1package photocopy
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "net/http"
8 "sync"
9 "time"
10
11 inserter "github.com/haileyok/photocopy/internal"
12 "github.com/prometheus/client_golang/prometheus"
13 "github.com/prometheus/client_golang/prometheus/promauto"
14 "github.com/prometheus/client_golang/prometheus/promhttp"
15)
16
17type Photocopy struct {
18 logger *slog.Logger
19 wg sync.WaitGroup
20
21 relayHost string
22 cursor string
23 cursorFile string
24 metricsAddr string
25
26 inserters *Inserters
27
28 plcScraper *PLCScraper
29}
30
31type Inserters struct {
32 followsInserter *inserter.Inserter
33 plcInserter *inserter.Inserter
34}
35
36type Args struct {
37 Logger *slog.Logger
38 RelayHost string
39 MetricsAddr string
40 CursorFile string
41 PLCScraperCursorFile string
42 ClickhouseUser string
43 ClickhousePass string
44}
45
46func New(ctx context.Context, args *Args) (*Photocopy, error) {
47 p := &Photocopy{
48 logger: args.Logger,
49 metricsAddr: args.MetricsAddr,
50 relayHost: args.RelayHost,
51 wg: sync.WaitGroup{},
52 cursorFile: args.CursorFile,
53 }
54
55 insertionsHist := promauto.NewHistogramVec(prometheus.HistogramOpts{
56 Name: "photocopy_inserts_time",
57 Help: "histogram of photocopy inserts",
58 Buckets: prometheus.ExponentialBucketsRange(0.0001, 30, 20),
59 }, []string{"type"})
60
61 fi, err := inserter.New(ctx, &inserter.Args{
62 PrometheusCounterPrefix: "photocopy_follows",
63 Histogram: insertionsHist,
64 BatchSize: 100,
65 Logger: p.logger,
66 })
67 if err != nil {
68 return nil, err
69 }
70
71 is := &Inserters{
72 followsInserter: fi,
73 }
74
75 p.inserters = is
76
77 plci, err := inserter.New(ctx, &inserter.Args{
78 PrometheusCounterPrefix: "photocopy_plc_entries",
79 Histogram: insertionsHist,
80 BatchSize: 100,
81 Logger: args.Logger,
82 })
83 if err != nil {
84 return nil, err
85 }
86
87 plcs, err := NewPLCScraper(ctx, PLCScraperArgs{
88 Logger: p.logger,
89 Inserter: plci,
90 CursorFile: args.PLCScraperCursorFile,
91 })
92 if err != nil {
93 return nil, err
94 }
95
96 p.inserters.plcInserter = plci
97 p.plcScraper = plcs
98
99 return p, nil
100}
101
102func (p *Photocopy) Run(baseCtx context.Context) error {
103 ctx, cancel := context.WithCancel(baseCtx)
104
105 metricsServer := http.NewServeMux()
106 metricsServer.Handle("/metrics", promhttp.Handler())
107
108 go func() {
109 p.logger.Info("Starting metrics server")
110 if err := http.ListenAndServe(p.metricsAddr, metricsServer); err != nil {
111 p.logger.Error("metrics server failed", "error", err)
112 }
113 }()
114
115 go func(ctx context.Context, cancel context.CancelFunc) {
116 p.logger.Info("starting relay", "relayHost", p.relayHost)
117 if err := p.startConsumer(ctx, cancel); err != nil {
118 panic(fmt.Errorf("failed to start consumer: %w", err))
119 }
120 }(ctx, cancel)
121
122 go func(ctx context.Context) {
123 if err := p.plcScraper.Run(ctx); err != nil {
124 panic(fmt.Errorf("failed to start plc scraper: %w", err))
125 }
126 }(ctx)
127
128 <-ctx.Done()
129
130 if p.inserters != nil {
131 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
132
133 p.logger.Info("stopping inserters")
134
135 if p.inserters.followsInserter != nil {
136 p.wg.Add(1)
137 go func() {
138 defer p.wg.Done()
139 if err := p.inserters.followsInserter.Close(ctx); err != nil {
140 p.logger.Error("failed to close follows inserter", "error", err)
141 return
142 }
143 p.logger.Info("follows inserter closed")
144 }()
145 }
146
147 if p.inserters.plcInserter != nil {
148 p.wg.Add(1)
149 go func() {
150 defer p.wg.Done()
151 if err := p.inserters.plcInserter.Close(ctx); err != nil {
152 p.logger.Error("failed to close plc inserter", "error", err)
153 return
154 }
155 p.logger.Info("plc inserter closed")
156 }()
157 }
158
159 p.wg.Wait()
160
161 cancel()
162
163 p.logger.Info("inserters closed")
164 }
165
166 return nil
167}