1package photocopy
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "net/http"
8 "sync"
9 "time"
10
11 "github.com/ClickHouse/clickhouse-go/v2"
12 "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
13 "github.com/haileyok/photocopy/clickhouse_inserter"
14 "github.com/haileyok/photocopy/nervana"
15 "github.com/prometheus/client_golang/prometheus"
16 "github.com/prometheus/client_golang/prometheus/promauto"
17 "github.com/prometheus/client_golang/prometheus/promhttp"
18)
19
20type Photocopy struct {
21 logger *slog.Logger
22 wg sync.WaitGroup
23
24 relayHost string
25 cursor string
26 cursorFile string
27 metricsAddr string
28
29 inserters *Inserters
30
31 plcScraper *PLCScraper
32
33 ratelimitBypassKey string
34
35 conn driver.Conn
36
37 nervanaClient *nervana.Client
38 nervanaEndpoint string
39 nervanaApiKey string
40}
41
42type Inserters struct {
43 followsInserter *clickhouse_inserter.Inserter
44 interactionsInserter *clickhouse_inserter.Inserter
45 postsInserter *clickhouse_inserter.Inserter
46 plcInserter *clickhouse_inserter.Inserter
47 recordsInserter *clickhouse_inserter.Inserter
48 deletesInserter *clickhouse_inserter.Inserter
49 labelsInserter *clickhouse_inserter.Inserter
50}
51
52type Args struct {
53 Logger *slog.Logger
54 RelayHost string
55 MetricsAddr string
56 CursorFile string
57 PLCScraperCursorFile string
58 ClickhouseAddr string
59 ClickhouseDatabase string
60 ClickhouseUser string
61 ClickhousePass string
62 RatelimitBypassKey string
63 NervanaEndpoint string
64 NervanaApiKey string
65}
66
67func New(ctx context.Context, args *Args) (*Photocopy, error) {
68 conn, err := clickhouse.Open(&clickhouse.Options{
69 Addr: []string{args.ClickhouseAddr},
70 Auth: clickhouse.Auth{
71 Database: args.ClickhouseDatabase,
72 Username: args.ClickhouseUser,
73 Password: args.ClickhousePass,
74 },
75 })
76 if err != nil {
77 return nil, err
78 }
79
80 p := &Photocopy{
81 logger: args.Logger,
82 metricsAddr: args.MetricsAddr,
83 relayHost: args.RelayHost,
84 wg: sync.WaitGroup{},
85 cursorFile: args.CursorFile,
86 ratelimitBypassKey: args.RatelimitBypassKey,
87 conn: conn,
88 }
89
90 insertionsHist := promauto.NewHistogramVec(prometheus.HistogramOpts{
91 Name: "photocopy_inserts_time",
92 Help: "histogram of photocopy inserts",
93 Buckets: prometheus.ExponentialBucketsRange(0.0001, 30, 20),
94 }, []string{"type"})
95
96 fi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
97 PrometheusCounterPrefix: "photocopy_follows",
98 Histogram: insertionsHist,
99 BatchSize: 500,
100 Logger: p.logger,
101 Conn: conn,
102 Query: "INSERT INTO follow (uri, did, rkey, created_at, indexed_at, subject)",
103 RateLimit: 3,
104 })
105 if err != nil {
106 return nil, err
107 }
108
109 pi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
110 PrometheusCounterPrefix: "photocopy_posts",
111 Histogram: insertionsHist,
112 BatchSize: 300,
113 Logger: p.logger,
114 Conn: conn,
115 Query: "INSERT INTO post (uri, did, rkey, created_at, indexed_at, root_uri, root_did, parent_uri, parent_did, quote_uri, quote_did, lang)",
116 RateLimit: 3,
117 })
118 if err != nil {
119 return nil, err
120 }
121
122 ii, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
123 PrometheusCounterPrefix: "photocopy_interactions",
124 Histogram: insertionsHist,
125 BatchSize: 1000,
126 Logger: p.logger,
127 Conn: conn,
128 Query: "INSERT INTO interaction (uri, did, rkey, kind, created_at, indexed_at, subject_uri, subject_did)",
129 RateLimit: 3,
130 })
131 if err != nil {
132 return nil, err
133 }
134
135 ri, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
136 PrometheusCounterPrefix: "photocopy_records",
137 Histogram: insertionsHist,
138 BatchSize: 2500,
139 Logger: p.logger,
140 Conn: conn,
141 Query: "INSERT INTO record (did, rkey, collection, cid, seq, raw, created_at)",
142 RateLimit: 3,
143 })
144 if err != nil {
145 return nil, err
146 }
147
148 di, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
149 PrometheusCounterPrefix: "photocopy_deletes",
150 Histogram: insertionsHist,
151 BatchSize: 500,
152 Logger: p.logger,
153 Conn: conn,
154 Query: "INSERT INTO delete (did, rkey, created_at)",
155 RateLimit: 3,
156 })
157 if err != nil {
158 return nil, err
159 }
160
161 li, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
162 PrometheusCounterPrefix: "photocopy_labels",
163 Histogram: insertionsHist,
164 BatchSize: 100,
165 Logger: p.logger,
166 Conn: conn,
167 Query: "INSERT INTO post_label (did, rkey, text, label, entity_id, description, topic, created_at)",
168 RateLimit: 3,
169 })
170 if err != nil {
171 return nil, err
172 }
173
174 is := &Inserters{
175 followsInserter: fi,
176 postsInserter: pi,
177 interactionsInserter: ii,
178 recordsInserter: ri,
179 deletesInserter: di,
180 labelsInserter: li,
181 }
182
183 p.inserters = is
184
185 plci, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
186 PrometheusCounterPrefix: "photocopy_plc_entries",
187 Histogram: insertionsHist,
188 BatchSize: 100,
189 Logger: args.Logger,
190 Conn: conn,
191 Query: `INSERT INTO plc (
192 did, cid, nullified, created_at, plc_op_sig, plc_op_prev, plc_op_type,
193 plc_op_services, plc_op_also_known_as, plc_op_rotation_keys,
194 plc_tomb_sig, plc_tomb_prev, plc_tomb_type,
195 legacy_op_sig, legacy_op_prev, legacy_op_type, legacy_op_handle,
196 legacy_op_service, legacy_op_signing_key, legacy_op_recovery_key
197 )`,
198 })
199 if err != nil {
200 return nil, err
201 }
202
203 plcs, err := NewPLCScraper(ctx, PLCScraperArgs{
204 Logger: p.logger,
205 Inserter: plci,
206 CursorFile: args.PLCScraperCursorFile,
207 })
208 if err != nil {
209 return nil, err
210 }
211
212 p.inserters.plcInserter = plci
213 p.plcScraper = plcs
214
215 if args.NervanaApiKey != "" && args.NervanaEndpoint != "" {
216 p.nervanaClient = nervana.NewClient(args.NervanaEndpoint, args.NervanaApiKey)
217 }
218
219 return p, nil
220}
221
222func (p *Photocopy) Run(baseCtx context.Context, withBackfill bool) error {
223 ctx, cancel := context.WithCancel(baseCtx)
224
225 metricsServer := http.NewServeMux()
226 metricsServer.Handle("/metrics", promhttp.Handler())
227
228 go func() {
229 p.logger.Info("Starting metrics server")
230 if err := http.ListenAndServe(p.metricsAddr, metricsServer); err != nil {
231 p.logger.Error("metrics server failed", "error", err)
232 }
233 }()
234
235 go func(ctx context.Context, cancel context.CancelFunc) {
236 p.logger.Info("starting relay", "relayHost", p.relayHost)
237 if err := p.startConsumer(ctx, cancel); err != nil {
238 panic(fmt.Errorf("failed to start consumer: %w", err))
239 }
240 }(ctx, cancel)
241
242 go func(ctx context.Context) {
243 if err := p.plcScraper.Run(ctx); err != nil {
244 panic(fmt.Errorf("failed to start plc scraper: %w", err))
245 }
246 }(ctx)
247
248 if withBackfill {
249 go func(ctx context.Context) {
250 if err := p.runBackfiller(ctx); err != nil {
251 panic(fmt.Errorf("error starting backfiller: %w", err))
252 }
253 }(ctx)
254 }
255
256 <-ctx.Done()
257
258 if p.inserters != nil {
259 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
260
261 p.logger.Info("stopping inserters")
262
263 if p.inserters.followsInserter != nil {
264 p.wg.Add(1)
265 go func() {
266 defer p.wg.Done()
267 if err := p.inserters.followsInserter.Close(ctx); err != nil {
268 p.logger.Error("failed to close follows inserter", "error", err)
269 return
270 }
271 p.logger.Info("follows inserter closed")
272 }()
273 }
274
275 if p.inserters.interactionsInserter != nil {
276 p.wg.Add(1)
277 go func() {
278 defer p.wg.Done()
279 if err := p.inserters.interactionsInserter.Close(ctx); err != nil {
280 p.logger.Error("failed to close interactions inserter", "error", err)
281 return
282 }
283 p.logger.Info("interactions inserter closed")
284 }()
285 }
286
287 if p.inserters.postsInserter != nil {
288 p.wg.Add(1)
289 go func() {
290 defer p.wg.Done()
291 if err := p.inserters.postsInserter.Close(ctx); err != nil {
292 p.logger.Error("failed to close posts inserter", "error", err)
293 return
294 }
295 p.logger.Info("posts inserter closed")
296 }()
297 }
298
299 if p.inserters.recordsInserter != nil {
300 p.wg.Add(1)
301 go func() {
302 defer p.wg.Done()
303 if err := p.inserters.recordsInserter.Close(ctx); err != nil {
304 p.logger.Error("failed to close records inserter", "error", err)
305 return
306 }
307 p.logger.Info("records inserter closed")
308 }()
309 }
310
311 if p.inserters.deletesInserter != nil {
312 p.wg.Add(1)
313 go func() {
314 defer p.wg.Done()
315 if err := p.inserters.deletesInserter.Close(ctx); err != nil {
316 p.logger.Error("failed to close deletes inserter", "error", err)
317 return
318 }
319 p.logger.Info("deletes inserter closed")
320 }()
321 }
322
323 if p.inserters.plcInserter != nil {
324 p.wg.Add(1)
325 go func() {
326 defer p.wg.Done()
327 if err := p.inserters.plcInserter.Close(ctx); err != nil {
328 p.logger.Error("failed to close plc inserter", "error", err)
329 return
330 }
331 p.logger.Info("plc inserter closed")
332 }()
333 }
334
335 p.wg.Wait()
336
337 cancel()
338
339 p.logger.Info("inserters closed")
340 }
341
342 return nil
343}