1package photocopy
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "net/http"
8 "sync"
9 "time"
10
11 "github.com/ClickHouse/clickhouse-go/v2"
12 "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
13 "github.com/haileyok/photocopy/clickhouse_inserter"
14 "github.com/prometheus/client_golang/prometheus"
15 "github.com/prometheus/client_golang/prometheus/promauto"
16 "github.com/prometheus/client_golang/prometheus/promhttp"
17)
18
19type Photocopy struct {
20 logger *slog.Logger
21 wg sync.WaitGroup
22
23 relayHost string
24 cursor string
25 cursorFile string
26 metricsAddr string
27
28 inserters *Inserters
29
30 plcScraper *PLCScraper
31
32 ratelimitBypassKey string
33
34 conn driver.Conn
35
36 nervanaClient *http.Client
37 nervanaEndpoint string
38 nervanaApiKey string
39}
40
41type Inserters struct {
42 followsInserter *clickhouse_inserter.Inserter
43 interactionsInserter *clickhouse_inserter.Inserter
44 postsInserter *clickhouse_inserter.Inserter
45 plcInserter *clickhouse_inserter.Inserter
46 recordsInserter *clickhouse_inserter.Inserter
47 deletesInserter *clickhouse_inserter.Inserter
48 labelsInserter *clickhouse_inserter.Inserter
49}
50
51type Args struct {
52 Logger *slog.Logger
53 RelayHost string
54 MetricsAddr string
55 CursorFile string
56 PLCScraperCursorFile string
57 ClickhouseAddr string
58 ClickhouseDatabase string
59 ClickhouseUser string
60 ClickhousePass string
61 RatelimitBypassKey string
62 NervanaEndpoint string
63 NervanaApiKey string
64}
65
66func New(ctx context.Context, args *Args) (*Photocopy, error) {
67 conn, err := clickhouse.Open(&clickhouse.Options{
68 Addr: []string{args.ClickhouseAddr},
69 Auth: clickhouse.Auth{
70 Database: args.ClickhouseDatabase,
71 Username: args.ClickhouseUser,
72 Password: args.ClickhousePass,
73 },
74 })
75 if err != nil {
76 return nil, err
77 }
78
79 p := &Photocopy{
80 logger: args.Logger,
81 metricsAddr: args.MetricsAddr,
82 relayHost: args.RelayHost,
83 wg: sync.WaitGroup{},
84 cursorFile: args.CursorFile,
85 ratelimitBypassKey: args.RatelimitBypassKey,
86 conn: conn,
87 }
88
89 insertionsHist := promauto.NewHistogramVec(prometheus.HistogramOpts{
90 Name: "photocopy_inserts_time",
91 Help: "histogram of photocopy inserts",
92 Buckets: prometheus.ExponentialBucketsRange(0.0001, 30, 20),
93 }, []string{"type"})
94
95 fi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
96 PrometheusCounterPrefix: "photocopy_follows",
97 Histogram: insertionsHist,
98 BatchSize: 500,
99 Logger: p.logger,
100 Conn: conn,
101 Query: "INSERT INTO follow (uri, did, rkey, created_at, indexed_at, subject)",
102 RateLimit: 3,
103 })
104 if err != nil {
105 return nil, err
106 }
107
108 pi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
109 PrometheusCounterPrefix: "photocopy_posts",
110 Histogram: insertionsHist,
111 BatchSize: 300,
112 Logger: p.logger,
113 Conn: conn,
114 Query: "INSERT INTO post (uri, did, rkey, created_at, indexed_at, root_uri, root_did, parent_uri, parent_did, quote_uri, quote_did, lang)",
115 RateLimit: 3,
116 })
117 if err != nil {
118 return nil, err
119 }
120
121 ii, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
122 PrometheusCounterPrefix: "photocopy_interactions",
123 Histogram: insertionsHist,
124 BatchSize: 1000,
125 Logger: p.logger,
126 Conn: conn,
127 Query: "INSERT INTO interaction (uri, did, rkey, kind, created_at, indexed_at, subject_uri, subject_did)",
128 RateLimit: 3,
129 })
130 if err != nil {
131 return nil, err
132 }
133
134 ri, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
135 PrometheusCounterPrefix: "photocopy_records",
136 Histogram: insertionsHist,
137 BatchSize: 2500,
138 Logger: p.logger,
139 Conn: conn,
140 Query: "INSERT INTO record (did, rkey, collection, cid, seq, raw, created_at)",
141 RateLimit: 3,
142 })
143 if err != nil {
144 return nil, err
145 }
146
147 di, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
148 PrometheusCounterPrefix: "photocopy_deletes",
149 Histogram: insertionsHist,
150 BatchSize: 500,
151 Logger: p.logger,
152 Conn: conn,
153 Query: "INSERT INTO delete (did, rkey, created_at)",
154 RateLimit: 3,
155 })
156 if err != nil {
157 return nil, err
158 }
159
160 li, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
161 PrometheusCounterPrefix: "photocopy_labels",
162 Histogram: insertionsHist,
163 BatchSize: 100,
164 Logger: p.logger,
165 Conn: conn,
166 Query: "INSERT INTO post_label (did, rkey, text, label, entity_id, description, topic, created_at)",
167 RateLimit: 3,
168 })
169 if err != nil {
170 return nil, err
171 }
172
173 is := &Inserters{
174 followsInserter: fi,
175 postsInserter: pi,
176 interactionsInserter: ii,
177 recordsInserter: ri,
178 deletesInserter: di,
179 labelsInserter: li,
180 }
181
182 p.inserters = is
183
184 plci, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
185 PrometheusCounterPrefix: "photocopy_plc_entries",
186 Histogram: insertionsHist,
187 BatchSize: 100,
188 Logger: args.Logger,
189 Conn: conn,
190 Query: `INSERT INTO plc (
191 did, cid, nullified, created_at, plc_op_sig, plc_op_prev, plc_op_type,
192 plc_op_services, plc_op_also_known_as, plc_op_rotation_keys,
193 plc_tomb_sig, plc_tomb_prev, plc_tomb_type,
194 legacy_op_sig, legacy_op_prev, legacy_op_type, legacy_op_handle,
195 legacy_op_service, legacy_op_signing_key, legacy_op_recovery_key
196 )`,
197 })
198 if err != nil {
199 return nil, err
200 }
201
202 plcs, err := NewPLCScraper(ctx, PLCScraperArgs{
203 Logger: p.logger,
204 Inserter: plci,
205 CursorFile: args.PLCScraperCursorFile,
206 })
207 if err != nil {
208 return nil, err
209 }
210
211 p.inserters.plcInserter = plci
212 p.plcScraper = plcs
213
214 if args.NervanaApiKey != "" && args.NervanaEndpoint != "" {
215 p.nervanaClient = &http.Client{
216 Timeout: 5 * time.Second,
217 }
218 p.nervanaEndpoint = args.NervanaEndpoint
219 p.nervanaApiKey = args.NervanaApiKey
220 }
221
222 return p, nil
223}
224
225func (p *Photocopy) Run(baseCtx context.Context, withBackfill bool) error {
226 ctx, cancel := context.WithCancel(baseCtx)
227
228 metricsServer := http.NewServeMux()
229 metricsServer.Handle("/metrics", promhttp.Handler())
230
231 go func() {
232 p.logger.Info("Starting metrics server")
233 if err := http.ListenAndServe(p.metricsAddr, metricsServer); err != nil {
234 p.logger.Error("metrics server failed", "error", err)
235 }
236 }()
237
238 go func(ctx context.Context, cancel context.CancelFunc) {
239 p.logger.Info("starting relay", "relayHost", p.relayHost)
240 if err := p.startConsumer(ctx, cancel); err != nil {
241 panic(fmt.Errorf("failed to start consumer: %w", err))
242 }
243 }(ctx, cancel)
244
245 go func(ctx context.Context) {
246 if err := p.plcScraper.Run(ctx); err != nil {
247 panic(fmt.Errorf("failed to start plc scraper: %w", err))
248 }
249 }(ctx)
250
251 if withBackfill {
252 go func(ctx context.Context) {
253 if err := p.runBackfiller(ctx); err != nil {
254 panic(fmt.Errorf("error starting backfiller: %w", err))
255 }
256 }(ctx)
257 }
258
259 <-ctx.Done()
260
261 if p.inserters != nil {
262 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
263
264 p.logger.Info("stopping inserters")
265
266 if p.inserters.followsInserter != nil {
267 p.wg.Add(1)
268 go func() {
269 defer p.wg.Done()
270 if err := p.inserters.followsInserter.Close(ctx); err != nil {
271 p.logger.Error("failed to close follows inserter", "error", err)
272 return
273 }
274 p.logger.Info("follows inserter closed")
275 }()
276 }
277
278 if p.inserters.interactionsInserter != nil {
279 p.wg.Add(1)
280 go func() {
281 defer p.wg.Done()
282 if err := p.inserters.interactionsInserter.Close(ctx); err != nil {
283 p.logger.Error("failed to close interactions inserter", "error", err)
284 return
285 }
286 p.logger.Info("interactions inserter closed")
287 }()
288 }
289
290 if p.inserters.postsInserter != nil {
291 p.wg.Add(1)
292 go func() {
293 defer p.wg.Done()
294 if err := p.inserters.postsInserter.Close(ctx); err != nil {
295 p.logger.Error("failed to close posts inserter", "error", err)
296 return
297 }
298 p.logger.Info("posts inserter closed")
299 }()
300 }
301
302 if p.inserters.recordsInserter != nil {
303 p.wg.Add(1)
304 go func() {
305 defer p.wg.Done()
306 if err := p.inserters.recordsInserter.Close(ctx); err != nil {
307 p.logger.Error("failed to close records inserter", "error", err)
308 return
309 }
310 p.logger.Info("records inserter closed")
311 }()
312 }
313
314 if p.inserters.deletesInserter != nil {
315 p.wg.Add(1)
316 go func() {
317 defer p.wg.Done()
318 if err := p.inserters.deletesInserter.Close(ctx); err != nil {
319 p.logger.Error("failed to close deletes inserter", "error", err)
320 return
321 }
322 p.logger.Info("deletes inserter closed")
323 }()
324 }
325
326 if p.inserters.plcInserter != nil {
327 p.wg.Add(1)
328 go func() {
329 defer p.wg.Done()
330 if err := p.inserters.plcInserter.Close(ctx); err != nil {
331 p.logger.Error("failed to close plc inserter", "error", err)
332 return
333 }
334 p.logger.Info("plc inserter closed")
335 }()
336 }
337
338 p.wg.Wait()
339
340 cancel()
341
342 p.logger.Info("inserters closed")
343 }
344
345 return nil
346}