1package photocopy
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "net/http"
8 "sync"
9 "time"
10
11 "github.com/ClickHouse/clickhouse-go/v2"
12 "github.com/haileyok/photocopy/clickhouse_inserter"
13 "github.com/prometheus/client_golang/prometheus"
14 "github.com/prometheus/client_golang/prometheus/promauto"
15 "github.com/prometheus/client_golang/prometheus/promhttp"
16)
17
18type Photocopy struct {
19 logger *slog.Logger
20 wg sync.WaitGroup
21
22 relayHost string
23 cursor string
24 cursorFile string
25 metricsAddr string
26
27 inserters *Inserters
28
29 plcScraper *PLCScraper
30}
31
32type Inserters struct {
33 followsInserter *clickhouse_inserter.Inserter
34 interactionsInserter *clickhouse_inserter.Inserter
35 postsInserter *clickhouse_inserter.Inserter
36 plcInserter *clickhouse_inserter.Inserter
37 recordsInserter *clickhouse_inserter.Inserter
38 deletesInserter *clickhouse_inserter.Inserter
39}
40
41type Args struct {
42 Logger *slog.Logger
43 RelayHost string
44 MetricsAddr string
45 CursorFile string
46 PLCScraperCursorFile string
47 ClickhouseAddr string
48 ClickhouseDatabase string
49 ClickhouseUser string
50 ClickhousePass string
51}
52
53func New(ctx context.Context, args *Args) (*Photocopy, error) {
54 p := &Photocopy{
55 logger: args.Logger,
56 metricsAddr: args.MetricsAddr,
57 relayHost: args.RelayHost,
58 wg: sync.WaitGroup{},
59 cursorFile: args.CursorFile,
60 }
61
62 conn, err := clickhouse.Open(&clickhouse.Options{
63 Addr: []string{args.ClickhouseAddr},
64 Auth: clickhouse.Auth{
65 Database: args.ClickhouseDatabase,
66 Username: args.ClickhouseUser,
67 Password: args.ClickhousePass,
68 },
69 })
70 if err != nil {
71 return nil, err
72 }
73
74 insertionsHist := promauto.NewHistogramVec(prometheus.HistogramOpts{
75 Name: "photocopy_inserts_time",
76 Help: "histogram of photocopy inserts",
77 Buckets: prometheus.ExponentialBucketsRange(0.0001, 30, 20),
78 }, []string{"type"})
79
80 fi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
81 PrometheusCounterPrefix: "photocopy_follows",
82 Histogram: insertionsHist,
83 BatchSize: 1000,
84 Logger: p.logger,
85 Conn: conn,
86 Query: "INSERT INTO follow (uri, did, rkey, created_at, indexed_at, subject)",
87 })
88 if err != nil {
89 return nil, err
90 }
91
92 pi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
93 PrometheusCounterPrefix: "photocopy_posts",
94 Histogram: insertionsHist,
95 BatchSize: 100,
96 Logger: p.logger,
97 Conn: conn,
98 Query: "INSERT INTO post (uri, did, rkey, created_at, indexed_at, root_uri, root_did, parent_uri, parent_did, quote_uri, quote_did)",
99 })
100 if err != nil {
101 return nil, err
102 }
103
104 ii, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
105 PrometheusCounterPrefix: "photocopy_interactions",
106 Histogram: insertionsHist,
107 BatchSize: 1000,
108 Logger: p.logger,
109 Conn: conn,
110 Query: "INSERT INTO interaction (uri, did, rkey, kind, created_at, indexed_at, subject_uri, subject_did)",
111 })
112 if err != nil {
113 return nil, err
114 }
115
116 ri, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
117 PrometheusCounterPrefix: "photocopy_records",
118 Histogram: insertionsHist,
119 BatchSize: 1000,
120 Logger: p.logger,
121 Conn: conn,
122 Query: "INSERT INTO record (did, rkey, collection, cid, seq, raw, created_at)",
123 })
124 if err != nil {
125 return nil, err
126 }
127
128 di, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
129 PrometheusCounterPrefix: "photocopy_deletes",
130 Histogram: insertionsHist,
131 BatchSize: 100,
132 Logger: p.logger,
133 Conn: conn,
134 Query: "INSERT INTO delete (did, rkey, created_at)",
135 })
136 if err != nil {
137 return nil, err
138 }
139
140 is := &Inserters{
141 followsInserter: fi,
142 postsInserter: pi,
143 interactionsInserter: ii,
144 recordsInserter: ri,
145 deletesInserter: di,
146 }
147
148 p.inserters = is
149
150 plci, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
151 PrometheusCounterPrefix: "photocopy_plc_entries",
152 Histogram: insertionsHist,
153 BatchSize: 100,
154 Logger: args.Logger,
155 Conn: conn,
156 Query: `INSERT INTO plc (
157 did, cid, nullified, created_at, plc_op_sig, plc_op_prev, plc_op_type,
158 plc_op_services, plc_op_also_known_as, plc_op_rotation_keys,
159 plc_tomb_sig, plc_tomb_prev, plc_tomb_type,
160 legacy_op_sig, legacy_op_prev, legacy_op_type, legacy_op_handle,
161 legacy_op_service, legacy_op_signing_key, legacy_op_recovery_key
162 )`,
163 })
164 if err != nil {
165 return nil, err
166 }
167
168 plcs, err := NewPLCScraper(ctx, PLCScraperArgs{
169 Logger: p.logger,
170 Inserter: plci,
171 CursorFile: args.PLCScraperCursorFile,
172 })
173 if err != nil {
174 return nil, err
175 }
176
177 p.inserters.plcInserter = plci
178 p.plcScraper = plcs
179
180 return p, nil
181}
182
183func (p *Photocopy) Run(baseCtx context.Context) error {
184 ctx, cancel := context.WithCancel(baseCtx)
185
186 metricsServer := http.NewServeMux()
187 metricsServer.Handle("/metrics", promhttp.Handler())
188
189 go func() {
190 p.logger.Info("Starting metrics server")
191 if err := http.ListenAndServe(p.metricsAddr, metricsServer); err != nil {
192 p.logger.Error("metrics server failed", "error", err)
193 }
194 }()
195
196 go func(ctx context.Context, cancel context.CancelFunc) {
197 p.logger.Info("starting relay", "relayHost", p.relayHost)
198 if err := p.startConsumer(ctx, cancel); err != nil {
199 panic(fmt.Errorf("failed to start consumer: %w", err))
200 }
201 }(ctx, cancel)
202
203 go func(ctx context.Context) {
204 if err := p.plcScraper.Run(ctx); err != nil {
205 panic(fmt.Errorf("failed to start plc scraper: %w", err))
206 }
207 }(ctx)
208
209 <-ctx.Done()
210
211 if p.inserters != nil {
212 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
213
214 p.logger.Info("stopping inserters")
215
216 if p.inserters.followsInserter != nil {
217 p.wg.Add(1)
218 go func() {
219 defer p.wg.Done()
220 if err := p.inserters.followsInserter.Close(ctx); err != nil {
221 p.logger.Error("failed to close follows inserter", "error", err)
222 return
223 }
224 p.logger.Info("follows inserter closed")
225 }()
226 }
227
228 if p.inserters.interactionsInserter != nil {
229 p.wg.Add(1)
230 go func() {
231 defer p.wg.Done()
232 if err := p.inserters.interactionsInserter.Close(ctx); err != nil {
233 p.logger.Error("failed to close interactions inserter", "error", err)
234 return
235 }
236 p.logger.Info("interactions inserter closed")
237 }()
238 }
239
240 if p.inserters.postsInserter != nil {
241 p.wg.Add(1)
242 go func() {
243 defer p.wg.Done()
244 if err := p.inserters.postsInserter.Close(ctx); err != nil {
245 p.logger.Error("failed to close posts inserter", "error", err)
246 return
247 }
248 p.logger.Info("posts inserter closed")
249 }()
250 }
251
252 if p.inserters.recordsInserter != nil {
253 p.wg.Add(1)
254 go func() {
255 defer p.wg.Done()
256 if err := p.inserters.recordsInserter.Close(ctx); err != nil {
257 p.logger.Error("failed to close records inserter", "error", err)
258 return
259 }
260 p.logger.Info("records inserter closed")
261 }()
262 }
263
264 if p.inserters.deletesInserter != nil {
265 p.wg.Add(1)
266 go func() {
267 defer p.wg.Done()
268 if err := p.inserters.deletesInserter.Close(ctx); err != nil {
269 p.logger.Error("failed to close deletes inserter", "error", err)
270 return
271 }
272 p.logger.Info("deletes inserter closed")
273 }()
274 }
275
276 if p.inserters.plcInserter != nil {
277 p.wg.Add(1)
278 go func() {
279 defer p.wg.Done()
280 if err := p.inserters.plcInserter.Close(ctx); err != nil {
281 p.logger.Error("failed to close plc inserter", "error", err)
282 return
283 }
284 p.logger.Info("plc inserter closed")
285 }()
286 }
287
288 p.wg.Wait()
289
290 cancel()
291
292 p.logger.Info("inserters closed")
293 }
294
295 return nil
296}