1package photocopy
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "net/http"
8 "sync"
9 "time"
10
11 "github.com/ClickHouse/clickhouse-go/v2"
12 "github.com/haileyok/photocopy/clickhouse_inserter"
13 "github.com/prometheus/client_golang/prometheus"
14 "github.com/prometheus/client_golang/prometheus/promauto"
15 "github.com/prometheus/client_golang/prometheus/promhttp"
16)
17
18type Photocopy struct {
19 logger *slog.Logger
20 wg sync.WaitGroup
21
22 relayHost string
23 cursor string
24 cursorFile string
25 metricsAddr string
26
27 inserters *Inserters
28
29 plcScraper *PLCScraper
30}
31
32type Inserters struct {
33 followsInserter *clickhouse_inserter.Inserter
34 plcInserter *clickhouse_inserter.Inserter
35}
36
37type Args struct {
38 Logger *slog.Logger
39 RelayHost string
40 MetricsAddr string
41 CursorFile string
42 PLCScraperCursorFile string
43 ClickhouseAddr string
44 ClickhouseDatabase string
45 ClickhouseUser string
46 ClickhousePass string
47}
48
49func New(ctx context.Context, args *Args) (*Photocopy, error) {
50 p := &Photocopy{
51 logger: args.Logger,
52 metricsAddr: args.MetricsAddr,
53 relayHost: args.RelayHost,
54 wg: sync.WaitGroup{},
55 cursorFile: args.CursorFile,
56 }
57
58 conn, err := clickhouse.Open(&clickhouse.Options{
59 Addr: []string{args.ClickhouseAddr},
60 Auth: clickhouse.Auth{
61 Database: args.ClickhouseDatabase,
62 Username: args.ClickhouseUser,
63 Password: args.ClickhousePass,
64 },
65 })
66 if err != nil {
67 return nil, err
68 }
69
70 insertionsHist := promauto.NewHistogramVec(prometheus.HistogramOpts{
71 Name: "photocopy_inserts_time",
72 Help: "histogram of photocopy inserts",
73 Buckets: prometheus.ExponentialBucketsRange(0.0001, 30, 20),
74 }, []string{"type"})
75
76 fi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
77 PrometheusCounterPrefix: "photocopy_follows",
78 Histogram: insertionsHist,
79 BatchSize: 100,
80 Logger: p.logger,
81 Conn: conn,
82 Query: "",
83 })
84 if err != nil {
85 return nil, err
86 }
87
88 is := &Inserters{
89 followsInserter: fi,
90 }
91
92 p.inserters = is
93
94 plci, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
95 PrometheusCounterPrefix: "photocopy_plc_entries",
96 Histogram: insertionsHist,
97 BatchSize: 100,
98 Logger: args.Logger,
99 Conn: conn,
100 Query: `INSERT INTO plc (
101 did, cid, nullified, created_at, plc_op_sig, plc_op_prev, plc_op_type,
102 plc_op_services, plc_op_also_known_as, plc_op_rotation_keys,
103 plc_op_verification_methods, plc_tomb_sig, plc_tomb_prev, plc_tomb_type,
104 legacy_op_sig, legacy_op_prev, legacy_op_type, legacy_op_handle,
105 legacy_op_service, legacy_op_signing_key, legacy_op_recovery_key
106 )`,
107 })
108 if err != nil {
109 return nil, err
110 }
111
112 plcs, err := NewPLCScraper(ctx, PLCScraperArgs{
113 Logger: p.logger,
114 Inserter: plci,
115 CursorFile: args.PLCScraperCursorFile,
116 })
117 if err != nil {
118 return nil, err
119 }
120
121 p.inserters.plcInserter = plci
122 p.plcScraper = plcs
123
124 return p, nil
125}
126
127func (p *Photocopy) Run(baseCtx context.Context) error {
128 ctx, cancel := context.WithCancel(baseCtx)
129
130 metricsServer := http.NewServeMux()
131 metricsServer.Handle("/metrics", promhttp.Handler())
132
133 go func() {
134 p.logger.Info("Starting metrics server")
135 if err := http.ListenAndServe(p.metricsAddr, metricsServer); err != nil {
136 p.logger.Error("metrics server failed", "error", err)
137 }
138 }()
139
140 go func(ctx context.Context, cancel context.CancelFunc) {
141 p.logger.Info("starting relay", "relayHost", p.relayHost)
142 if err := p.startConsumer(ctx, cancel); err != nil {
143 panic(fmt.Errorf("failed to start consumer: %w", err))
144 }
145 }(ctx, cancel)
146
147 go func(ctx context.Context) {
148 if err := p.plcScraper.Run(ctx); err != nil {
149 panic(fmt.Errorf("failed to start plc scraper: %w", err))
150 }
151 }(ctx)
152
153 <-ctx.Done()
154
155 if p.inserters != nil {
156 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
157
158 p.logger.Info("stopping inserters")
159
160 if p.inserters.followsInserter != nil {
161 p.wg.Add(1)
162 go func() {
163 defer p.wg.Done()
164 if err := p.inserters.followsInserter.Close(ctx); err != nil {
165 p.logger.Error("failed to close follows inserter", "error", err)
166 return
167 }
168 p.logger.Info("follows inserter closed")
169 }()
170 }
171
172 if p.inserters.plcInserter != nil {
173 p.wg.Add(1)
174 go func() {
175 defer p.wg.Done()
176 if err := p.inserters.plcInserter.Close(ctx); err != nil {
177 p.logger.Error("failed to close plc inserter", "error", err)
178 return
179 }
180 p.logger.Info("plc inserter closed")
181 }()
182 }
183
184 p.wg.Wait()
185
186 cancel()
187
188 p.logger.Info("inserters closed")
189 }
190
191 return nil
192}