1package photocopy
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "net/http"
8 "sync"
9 "time"
10
11 "github.com/ClickHouse/clickhouse-go/v2"
12 "github.com/haileyok/photocopy/clickhouse_inserter"
13 "github.com/prometheus/client_golang/prometheus"
14 "github.com/prometheus/client_golang/prometheus/promauto"
15 "github.com/prometheus/client_golang/prometheus/promhttp"
16)
17
18type Photocopy struct {
19 logger *slog.Logger
20 wg sync.WaitGroup
21
22 relayHost string
23 cursor string
24 cursorFile string
25 metricsAddr string
26
27 inserters *Inserters
28
29 plcScraper *PLCScraper
30}
31
32type Inserters struct {
33 followsInserter *clickhouse_inserter.Inserter
34 interactionsInserter *clickhouse_inserter.Inserter
35 postsInserter *clickhouse_inserter.Inserter
36 plcInserter *clickhouse_inserter.Inserter
37}
38
39type Args struct {
40 Logger *slog.Logger
41 RelayHost string
42 MetricsAddr string
43 CursorFile string
44 PLCScraperCursorFile string
45 ClickhouseAddr string
46 ClickhouseDatabase string
47 ClickhouseUser string
48 ClickhousePass string
49}
50
51func New(ctx context.Context, args *Args) (*Photocopy, error) {
52 p := &Photocopy{
53 logger: args.Logger,
54 metricsAddr: args.MetricsAddr,
55 relayHost: args.RelayHost,
56 wg: sync.WaitGroup{},
57 cursorFile: args.CursorFile,
58 }
59
60 conn, err := clickhouse.Open(&clickhouse.Options{
61 Addr: []string{args.ClickhouseAddr},
62 Auth: clickhouse.Auth{
63 Database: args.ClickhouseDatabase,
64 Username: args.ClickhouseUser,
65 Password: args.ClickhousePass,
66 },
67 })
68 if err != nil {
69 return nil, err
70 }
71
72 insertionsHist := promauto.NewHistogramVec(prometheus.HistogramOpts{
73 Name: "photocopy_inserts_time",
74 Help: "histogram of photocopy inserts",
75 Buckets: prometheus.ExponentialBucketsRange(0.0001, 30, 20),
76 }, []string{"type"})
77
78 fi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
79 PrometheusCounterPrefix: "photocopy_follows",
80 Histogram: insertionsHist,
81 BatchSize: 1000,
82 Logger: p.logger,
83 Conn: conn,
84 Query: "INSERT INTO follow (uri, did, rkey, created_at, indexed_at, subject)",
85 })
86 if err != nil {
87 return nil, err
88 }
89
90 pi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
91 PrometheusCounterPrefix: "photocopy_posts",
92 Histogram: insertionsHist,
93 BatchSize: 100,
94 Logger: p.logger,
95 Conn: conn,
96 Query: "INSERT INTO post (uri, did, rkey, created_at, indexed_at, root_uri, root_did, parent_uri, parent_did, quote_uri, quote_did)",
97 })
98 if err != nil {
99 return nil, err
100 }
101
102 ii, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
103 PrometheusCounterPrefix: "photocopy_interactions",
104 Histogram: insertionsHist,
105 BatchSize: 1000,
106 Logger: p.logger,
107 Conn: conn,
108 Query: "INSERT INTO interaction (uri, did, rkey, kind, created_at, indexed_at, subject_uri, subject_did)",
109 })
110 if err != nil {
111 return nil, err
112 }
113
114 is := &Inserters{
115 followsInserter: fi,
116 postsInserter: pi,
117 interactionsInserter: ii,
118 }
119
120 p.inserters = is
121
122 plci, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
123 PrometheusCounterPrefix: "photocopy_plc_entries",
124 Histogram: insertionsHist,
125 BatchSize: 100,
126 Logger: args.Logger,
127 Conn: conn,
128 Query: `INSERT INTO plc (
129 did, cid, nullified, created_at, plc_op_sig, plc_op_prev, plc_op_type,
130 plc_op_services, plc_op_also_known_as, plc_op_rotation_keys,
131 plc_tomb_sig, plc_tomb_prev, plc_tomb_type,
132 legacy_op_sig, legacy_op_prev, legacy_op_type, legacy_op_handle,
133 legacy_op_service, legacy_op_signing_key, legacy_op_recovery_key
134 )`,
135 })
136 if err != nil {
137 return nil, err
138 }
139
140 plcs, err := NewPLCScraper(ctx, PLCScraperArgs{
141 Logger: p.logger,
142 Inserter: plci,
143 CursorFile: args.PLCScraperCursorFile,
144 })
145 if err != nil {
146 return nil, err
147 }
148
149 p.inserters.plcInserter = plci
150 p.plcScraper = plcs
151
152 return p, nil
153}
154
155func (p *Photocopy) Run(baseCtx context.Context) error {
156 ctx, cancel := context.WithCancel(baseCtx)
157
158 metricsServer := http.NewServeMux()
159 metricsServer.Handle("/metrics", promhttp.Handler())
160
161 go func() {
162 p.logger.Info("Starting metrics server")
163 if err := http.ListenAndServe(p.metricsAddr, metricsServer); err != nil {
164 p.logger.Error("metrics server failed", "error", err)
165 }
166 }()
167
168 go func(ctx context.Context, cancel context.CancelFunc) {
169 p.logger.Info("starting relay", "relayHost", p.relayHost)
170 if err := p.startConsumer(ctx, cancel); err != nil {
171 panic(fmt.Errorf("failed to start consumer: %w", err))
172 }
173 }(ctx, cancel)
174
175 go func(ctx context.Context) {
176 if err := p.plcScraper.Run(ctx); err != nil {
177 panic(fmt.Errorf("failed to start plc scraper: %w", err))
178 }
179 }(ctx)
180
181 <-ctx.Done()
182
183 if p.inserters != nil {
184 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
185
186 p.logger.Info("stopping inserters")
187
188 if p.inserters.followsInserter != nil {
189 p.wg.Add(1)
190 go func() {
191 defer p.wg.Done()
192 if err := p.inserters.followsInserter.Close(ctx); err != nil {
193 p.logger.Error("failed to close follows inserter", "error", err)
194 return
195 }
196 p.logger.Info("follows inserter closed")
197 }()
198 }
199
200 if p.inserters.plcInserter != nil {
201 p.wg.Add(1)
202 go func() {
203 defer p.wg.Done()
204 if err := p.inserters.plcInserter.Close(ctx); err != nil {
205 p.logger.Error("failed to close plc inserter", "error", err)
206 return
207 }
208 p.logger.Info("plc inserter closed")
209 }()
210 }
211
212 p.wg.Wait()
213
214 cancel()
215
216 p.logger.Info("inserters closed")
217 }
218
219 return nil
220}