plc.directory mirror
1package mirror
2
3import (
4 "bufio"
5 "context"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "time"
11
12 "tangled.sh/seiso.moe/alethia.directory/ent"
13 "tangled.sh/seiso.moe/alethia.directory/ent/operation"
14 "tangled.sh/seiso.moe/alethia.directory/pkg/plc"
15)
16
17type PLCExportEntry struct {
18 DID string `json:"did"`
19 Operation plc.PLCOperation `json:"operation"`
20 CID string `json:"cid"`
21 Nullified bool `json:"nullified"`
22 CreatedAt string `json:"createdAt"`
23}
24
25type MirrorService struct {
26 client *ent.Client
27 logger *slog.Logger
28}
29
30func NewMirrorService(client *ent.Client, logger *slog.Logger) *MirrorService {
31 return &MirrorService{
32 client: client,
33 logger: logger,
34 }
35}
36
37func handleRateLimit() time.Duration {
38 // plc.directory does not use any rate limit headers so
39 // just default to using a 5 minute rate limit for now.
40 return time.Minute * 5
41}
42
43func (ms *MirrorService) SyncOnce(ctx context.Context) error {
44 var after time.Time
45
46 latestOperation, err := ms.client.Operation.Query().
47 Order(ent.Desc(operation.FieldCreatedAt)).
48 First(ctx)
49
50 if err != nil {
51 if ent.IsNotFound(err) {
52 ms.logger.Info("no existing operations found, starting from beginning")
53 after = time.Time{}
54 } else {
55 return fmt.Errorf("failed to query latest operation: %w", err)
56 }
57 } else {
58 after = latestOperation.CreatedAt
59 ms.logger.Debug("resuming sync from timestamp", "after", after.Format(time.RFC3339))
60 }
61
62 formatted := after.Format(time.RFC3339Nano)
63 url := fmt.Sprintf("https://plc.directory/export?count=1000&after=%s", formatted)
64
65 ms.logger.Debug("fetching operations", "url", url)
66
67 resp, err := http.Get(url)
68 if err != nil {
69 return fmt.Errorf("failed to fetch from PLC directory: %w", err)
70 }
71 defer resp.Body.Close()
72
73 if resp.StatusCode == 429 {
74 retryAfter := handleRateLimit()
75 ms.logger.Warn("rate limited, waiting before retry", "retry_after", retryAfter)
76 time.Sleep(retryAfter)
77 return nil
78 }
79
80 if resp.StatusCode != 200 {
81 return fmt.Errorf("unexpected HTTP status: %d %s", resp.StatusCode, resp.Status)
82 }
83
84 return ms.processResponse(ctx, resp)
85
86}
87
88func (ms *MirrorService) processResponse(ctx context.Context, resp *http.Response) error {
89 scanner := bufio.NewScanner(resp.Body)
90 lineCount := 0
91 processedCount := 0
92 var latestOperationTime time.Time
93
94 for scanner.Scan() {
95 lineCount++
96
97 var entry PLCExportEntry
98 if err := json.Unmarshal(scanner.Bytes(), &entry); err != nil {
99 ms.logger.Error("failed to unmarshal entry", "line", lineCount, "error", err)
100 continue
101 }
102
103 operationTime, err := time.Parse(time.RFC3339Nano, entry.CreatedAt)
104 if err != nil {
105 continue
106 }
107
108 if operationTime.After(latestOperationTime) {
109 latestOperationTime = operationTime
110 }
111
112 if err := ms.storeOperation(ctx, &entry); err != nil {
113 ms.logger.Error("failed to store operation",
114 "did", entry.DID,
115 "cid", entry.CID,
116 "line", lineCount,
117 "error", err)
118 continue
119 }
120
121 processedCount++
122 }
123 if err := scanner.Err(); err != nil {
124 return fmt.Errorf("error reading response body: %w", err)
125 }
126
127 if processedCount > 0 {
128 if err := ms.updateSyncStatus(ctx, latestOperationTime, time.Now()); err != nil {
129 ms.logger.Error("failed to update sync status", "error", err)
130 }
131 }
132
133 ms.logger.Info("sync batch completed",
134 "total_lines", lineCount,
135 "processed", processedCount,
136 "skipped", lineCount-processedCount)
137 return nil
138}
139
140func (ms *MirrorService) storeOperation(ctx context.Context, entry *PLCExportEntry) error {
141 createdAt, err := time.Parse(time.RFC3339Nano, entry.CreatedAt)
142 if err != nil {
143 return fmt.Errorf("failed to parse created_at timestamp '%s': %w", entry.CreatedAt, err)
144 }
145
146 _, err = ms.client.Operation.
147 Create().
148 SetDid(entry.DID).
149 SetOperation(entry.Operation).
150 SetCid(entry.CID).
151 SetNullified(entry.Nullified).
152 SetCreatedAt(createdAt).
153 Save(ctx)
154
155 if err != nil {
156 if ent.IsConstraintError(err) {
157 ms.logger.Debug("skipping duplicate operation", "did", entry.DID, "cid", entry.CID)
158 return nil
159 }
160 return fmt.Errorf("failed to save operation to database: %w", err)
161 }
162
163 return nil
164}
165
166func (ms *MirrorService) updateSyncStatus(ctx context.Context, lastOPTime, syncTime time.Time) error {
167 return ms.client.SyncStatus.
168 Create().
169 SetKey("last_mirror_sync").
170 SetLastOperationTime(lastOPTime).
171 SetLastSyncTime(syncTime).
172 OnConflictColumns("key").
173 UpdateNewValues().
174 Exec(ctx)
175}
176
177func (ms *MirrorService) Sync(ctx context.Context) error {
178 ticker := time.NewTicker(2 * time.Second)
179 defer ticker.Stop()
180
181 ms.logger.Info("starting sync loop")
182
183 for {
184 select {
185 case <-ctx.Done():
186 return ctx.Err()
187 case <-ticker.C:
188 if err := ms.SyncOnce(ctx); err != nil {
189 ms.logger.Error("sync iteration failed", "error", err)
190 }
191 }
192 }
193}