1package mirror 2 3import ( 4 "bufio" 5 "context" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "time" 11 12 "tangled.sh/seiso.moe/alethia.directory/ent" 13 "tangled.sh/seiso.moe/alethia.directory/ent/operation" 14 "tangled.sh/seiso.moe/alethia.directory/pkg/plc" 15) 16 17type PLCExportEntry struct { 18 DID string `json:"did"` 19 Operation plc.PLCOperation `json:"operation"` 20 CID string `json:"cid"` 21 Nullified bool `json:"nullified"` 22 CreatedAt string `json:"createdAt"` 23} 24 25type MirrorService struct { 26 client *ent.Client 27 logger *slog.Logger 28} 29 30func NewMirrorService(client *ent.Client, logger *slog.Logger) *MirrorService { 31 return &MirrorService{ 32 client: client, 33 logger: logger, 34 } 35} 36 37func handleRateLimit() time.Duration { 38 // plc.directory does not use any rate limit headers so 39 // just default to using a 5 minute rate limit for now. 40 return time.Minute * 5 41} 42 43func (ms *MirrorService) SyncOnce(ctx context.Context) error { 44 var after time.Time 45 46 latestOperation, err := ms.client.Operation.Query(). 47 Order(ent.Desc(operation.FieldCreatedAt)). 48 First(ctx) 49 50 if err != nil { 51 if ent.IsNotFound(err) { 52 ms.logger.Info("no existing operations found, starting from beginning") 53 after = time.Time{} 54 } else { 55 return fmt.Errorf("failed to query latest operation: %w", err) 56 } 57 } else { 58 after = latestOperation.CreatedAt 59 ms.logger.Debug("resuming sync from timestamp", "after", after.Format(time.RFC3339)) 60 } 61 62 formatted := after.Format(time.RFC3339Nano) 63 url := fmt.Sprintf("https://plc.directory/export?count=1000&after=%s", formatted) 64 65 ms.logger.Debug("fetching operations", "url", url) 66 67 resp, err := http.Get(url) 68 if err != nil { 69 return fmt.Errorf("failed to fetch from PLC directory: %w", err) 70 } 71 defer resp.Body.Close() 72 73 if resp.StatusCode == 429 { 74 retryAfter := handleRateLimit() 75 ms.logger.Warn("rate limited, waiting before retry", "retry_after", retryAfter) 76 time.Sleep(retryAfter) 77 return nil 78 } 79 80 if resp.StatusCode != 200 { 81 return fmt.Errorf("unexpected HTTP status: %d %s", resp.StatusCode, resp.Status) 82 } 83 84 return ms.processResponse(ctx, resp) 85 86} 87 88func (ms *MirrorService) processResponse(ctx context.Context, resp *http.Response) error { 89 scanner := bufio.NewScanner(resp.Body) 90 lineCount := 0 91 processedCount := 0 92 var latestOperationTime time.Time 93 94 for scanner.Scan() { 95 lineCount++ 96 97 var entry PLCExportEntry 98 if err := json.Unmarshal(scanner.Bytes(), &entry); err != nil { 99 ms.logger.Error("failed to unmarshal entry", "line", lineCount, "error", err) 100 continue 101 } 102 103 operationTime, err := time.Parse(time.RFC3339Nano, entry.CreatedAt) 104 if err != nil { 105 continue 106 } 107 108 if operationTime.After(latestOperationTime) { 109 latestOperationTime = operationTime 110 } 111 112 if err := ms.storeOperation(ctx, &entry); err != nil { 113 ms.logger.Error("failed to store operation", 114 "did", entry.DID, 115 "cid", entry.CID, 116 "line", lineCount, 117 "error", err) 118 continue 119 } 120 121 processedCount++ 122 } 123 if err := scanner.Err(); err != nil { 124 return fmt.Errorf("error reading response body: %w", err) 125 } 126 127 if processedCount > 0 { 128 if err := ms.updateSyncStatus(ctx, latestOperationTime, time.Now()); err != nil { 129 ms.logger.Error("failed to update sync status", "error", err) 130 } 131 } 132 133 ms.logger.Info("sync batch completed", 134 "total_lines", lineCount, 135 "processed", processedCount, 136 "skipped", lineCount-processedCount) 137 return nil 138} 139 140func (ms *MirrorService) storeOperation(ctx context.Context, entry *PLCExportEntry) error { 141 createdAt, err := time.Parse(time.RFC3339Nano, entry.CreatedAt) 142 if err != nil { 143 return fmt.Errorf("failed to parse created_at timestamp '%s': %w", entry.CreatedAt, err) 144 } 145 146 _, err = ms.client.Operation. 147 Create(). 148 SetDid(entry.DID). 149 SetOperation(entry.Operation). 150 SetCid(entry.CID). 151 SetNullified(entry.Nullified). 152 SetCreatedAt(createdAt). 153 Save(ctx) 154 155 if err != nil { 156 if ent.IsConstraintError(err) { 157 ms.logger.Debug("skipping duplicate operation", "did", entry.DID, "cid", entry.CID) 158 return nil 159 } 160 return fmt.Errorf("failed to save operation to database: %w", err) 161 } 162 163 return nil 164} 165 166func (ms *MirrorService) updateSyncStatus(ctx context.Context, lastOPTime, syncTime time.Time) error { 167 return ms.client.SyncStatus. 168 Create(). 169 SetKey("last_mirror_sync"). 170 SetLastOperationTime(lastOPTime). 171 SetLastSyncTime(syncTime). 172 OnConflictColumns("key"). 173 UpdateNewValues(). 174 Exec(ctx) 175} 176 177func (ms *MirrorService) Sync(ctx context.Context) error { 178 ticker := time.NewTicker(2 * time.Second) 179 defer ticker.Stop() 180 181 ms.logger.Info("starting sync loop") 182 183 for { 184 select { 185 case <-ctx.Done(): 186 return ctx.Err() 187 case <-ticker.C: 188 if err := ms.SyncOnce(ctx); err != nil { 189 ms.logger.Error("sync iteration failed", "error", err) 190 } 191 } 192 } 193}