1package photocopy
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "net/http"
8 "sync"
9 "time"
10
11 "github.com/ClickHouse/clickhouse-go/v2"
12 "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
13 "github.com/haileyok/photocopy/clickhouse_inserter"
14 "github.com/prometheus/client_golang/prometheus"
15 "github.com/prometheus/client_golang/prometheus/promauto"
16 "github.com/prometheus/client_golang/prometheus/promhttp"
17)
18
19type Photocopy struct {
20 logger *slog.Logger
21 wg sync.WaitGroup
22
23 relayHost string
24 cursor string
25 cursorFile string
26 metricsAddr string
27
28 inserters *Inserters
29
30 plcScraper *PLCScraper
31
32 ratelimitBypassKey string
33
34 conn driver.Conn
35}
36
37type Inserters struct {
38 followsInserter *clickhouse_inserter.Inserter
39 interactionsInserter *clickhouse_inserter.Inserter
40 postsInserter *clickhouse_inserter.Inserter
41 plcInserter *clickhouse_inserter.Inserter
42 recordsInserter *clickhouse_inserter.Inserter
43 deletesInserter *clickhouse_inserter.Inserter
44}
45
46type Args struct {
47 Logger *slog.Logger
48 RelayHost string
49 MetricsAddr string
50 CursorFile string
51 PLCScraperCursorFile string
52 ClickhouseAddr string
53 ClickhouseDatabase string
54 ClickhouseUser string
55 ClickhousePass string
56 RatelimitBypassKey string
57}
58
59func New(ctx context.Context, args *Args) (*Photocopy, error) {
60 conn, err := clickhouse.Open(&clickhouse.Options{
61 Addr: []string{args.ClickhouseAddr},
62 Auth: clickhouse.Auth{
63 Database: args.ClickhouseDatabase,
64 Username: args.ClickhouseUser,
65 Password: args.ClickhousePass,
66 },
67 })
68 if err != nil {
69 return nil, err
70 }
71
72 p := &Photocopy{
73 logger: args.Logger,
74 metricsAddr: args.MetricsAddr,
75 relayHost: args.RelayHost,
76 wg: sync.WaitGroup{},
77 cursorFile: args.CursorFile,
78 ratelimitBypassKey: args.RatelimitBypassKey,
79 conn: conn,
80 }
81
82 insertionsHist := promauto.NewHistogramVec(prometheus.HistogramOpts{
83 Name: "photocopy_inserts_time",
84 Help: "histogram of photocopy inserts",
85 Buckets: prometheus.ExponentialBucketsRange(0.0001, 30, 20),
86 }, []string{"type"})
87
88 fi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
89 PrometheusCounterPrefix: "photocopy_follows",
90 Histogram: insertionsHist,
91 BatchSize: 2500,
92 Logger: p.logger,
93 Conn: conn,
94 Query: "INSERT INTO follow (uri, did, rkey, created_at, indexed_at, subject)",
95 RateLimit: 3,
96 })
97 if err != nil {
98 return nil, err
99 }
100
101 pi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
102 PrometheusCounterPrefix: "photocopy_posts",
103 Histogram: insertionsHist,
104 BatchSize: 2500,
105 Logger: p.logger,
106 Conn: conn,
107 Query: "INSERT INTO post (uri, did, rkey, created_at, indexed_at, root_uri, root_did, parent_uri, parent_did, quote_uri, quote_did, lang)",
108 RateLimit: 3,
109 })
110 if err != nil {
111 return nil, err
112 }
113
114 ii, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
115 PrometheusCounterPrefix: "photocopy_interactions",
116 Histogram: insertionsHist,
117 BatchSize: 2500,
118 Logger: p.logger,
119 Conn: conn,
120 Query: "INSERT INTO interaction (uri, did, rkey, kind, created_at, indexed_at, subject_uri, subject_did)",
121 RateLimit: 3,
122 })
123 if err != nil {
124 return nil, err
125 }
126
127 ri, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
128 PrometheusCounterPrefix: "photocopy_records",
129 Histogram: insertionsHist,
130 BatchSize: 2500,
131 Logger: p.logger,
132 Conn: conn,
133 Query: "INSERT INTO record (did, rkey, collection, cid, seq, raw, created_at)",
134 RateLimit: 3,
135 })
136 if err != nil {
137 return nil, err
138 }
139
140 di, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
141 PrometheusCounterPrefix: "photocopy_deletes",
142 Histogram: insertionsHist,
143 BatchSize: 2500,
144 Logger: p.logger,
145 Conn: conn,
146 Query: "INSERT INTO delete (did, rkey, created_at)",
147 RateLimit: 3,
148 })
149 if err != nil {
150 return nil, err
151 }
152
153 is := &Inserters{
154 followsInserter: fi,
155 postsInserter: pi,
156 interactionsInserter: ii,
157 recordsInserter: ri,
158 deletesInserter: di,
159 }
160
161 p.inserters = is
162
163 plci, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
164 PrometheusCounterPrefix: "photocopy_plc_entries",
165 Histogram: insertionsHist,
166 BatchSize: 100,
167 Logger: args.Logger,
168 Conn: conn,
169 Query: `INSERT INTO plc (
170 did, cid, nullified, created_at, plc_op_sig, plc_op_prev, plc_op_type,
171 plc_op_services, plc_op_also_known_as, plc_op_rotation_keys,
172 plc_tomb_sig, plc_tomb_prev, plc_tomb_type,
173 legacy_op_sig, legacy_op_prev, legacy_op_type, legacy_op_handle,
174 legacy_op_service, legacy_op_signing_key, legacy_op_recovery_key
175 )`,
176 })
177 if err != nil {
178 return nil, err
179 }
180
181 plcs, err := NewPLCScraper(ctx, PLCScraperArgs{
182 Logger: p.logger,
183 Inserter: plci,
184 CursorFile: args.PLCScraperCursorFile,
185 })
186 if err != nil {
187 return nil, err
188 }
189
190 p.inserters.plcInserter = plci
191 p.plcScraper = plcs
192
193 return p, nil
194}
195
196func (p *Photocopy) Run(baseCtx context.Context, withBackfill bool) error {
197 ctx, cancel := context.WithCancel(baseCtx)
198
199 metricsServer := http.NewServeMux()
200 metricsServer.Handle("/metrics", promhttp.Handler())
201
202 go func() {
203 p.logger.Info("Starting metrics server")
204 if err := http.ListenAndServe(p.metricsAddr, metricsServer); err != nil {
205 p.logger.Error("metrics server failed", "error", err)
206 }
207 }()
208
209 go func(ctx context.Context, cancel context.CancelFunc) {
210 p.logger.Info("starting relay", "relayHost", p.relayHost)
211 if err := p.startConsumer(ctx, cancel); err != nil {
212 panic(fmt.Errorf("failed to start consumer: %w", err))
213 }
214 }(ctx, cancel)
215
216 go func(ctx context.Context) {
217 if err := p.plcScraper.Run(ctx); err != nil {
218 panic(fmt.Errorf("failed to start plc scraper: %w", err))
219 }
220 }(ctx)
221
222 if withBackfill {
223 go func(ctx context.Context) {
224 if err := p.runBackfiller(ctx); err != nil {
225 panic(fmt.Errorf("error starting backfiller: %w", err))
226 }
227 }(ctx)
228 }
229
230 <-ctx.Done()
231
232 if p.inserters != nil {
233 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
234
235 p.logger.Info("stopping inserters")
236
237 if p.inserters.followsInserter != nil {
238 p.wg.Add(1)
239 go func() {
240 defer p.wg.Done()
241 if err := p.inserters.followsInserter.Close(ctx); err != nil {
242 p.logger.Error("failed to close follows inserter", "error", err)
243 return
244 }
245 p.logger.Info("follows inserter closed")
246 }()
247 }
248
249 if p.inserters.interactionsInserter != nil {
250 p.wg.Add(1)
251 go func() {
252 defer p.wg.Done()
253 if err := p.inserters.interactionsInserter.Close(ctx); err != nil {
254 p.logger.Error("failed to close interactions inserter", "error", err)
255 return
256 }
257 p.logger.Info("interactions inserter closed")
258 }()
259 }
260
261 if p.inserters.postsInserter != nil {
262 p.wg.Add(1)
263 go func() {
264 defer p.wg.Done()
265 if err := p.inserters.postsInserter.Close(ctx); err != nil {
266 p.logger.Error("failed to close posts inserter", "error", err)
267 return
268 }
269 p.logger.Info("posts inserter closed")
270 }()
271 }
272
273 if p.inserters.recordsInserter != nil {
274 p.wg.Add(1)
275 go func() {
276 defer p.wg.Done()
277 if err := p.inserters.recordsInserter.Close(ctx); err != nil {
278 p.logger.Error("failed to close records inserter", "error", err)
279 return
280 }
281 p.logger.Info("records inserter closed")
282 }()
283 }
284
285 if p.inserters.deletesInserter != nil {
286 p.wg.Add(1)
287 go func() {
288 defer p.wg.Done()
289 if err := p.inserters.deletesInserter.Close(ctx); err != nil {
290 p.logger.Error("failed to close deletes inserter", "error", err)
291 return
292 }
293 p.logger.Info("deletes inserter closed")
294 }()
295 }
296
297 if p.inserters.plcInserter != nil {
298 p.wg.Add(1)
299 go func() {
300 defer p.wg.Done()
301 if err := p.inserters.plcInserter.Close(ctx); err != nil {
302 p.logger.Error("failed to close plc inserter", "error", err)
303 return
304 }
305 p.logger.Info("plc inserter closed")
306 }()
307 }
308
309 p.wg.Wait()
310
311 cancel()
312
313 p.logger.Info("inserters closed")
314 }
315
316 return nil
317}