this repo has no description
at main 3.6 kB view raw
1package photocopy 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "log/slog" 9 "net/http" 10 "os" 11 "strings" 12 "time" 13 14 "github.com/haileyok/photocopy/clickhouse_inserter" 15) 16 17type PLCScraper struct { 18 client *http.Client 19 logger *slog.Logger 20 cursor string 21 cursorFile string 22 inserter *clickhouse_inserter.Inserter 23} 24 25type PLCScraperArgs struct { 26 Logger *slog.Logger 27 Inserter *clickhouse_inserter.Inserter 28 CursorFile string 29} 30 31func NewPLCScraper(ctx context.Context, args PLCScraperArgs) (*PLCScraper, error) { 32 if args.Logger == nil { 33 args.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ 34 Level: slog.LevelInfo, 35 })) 36 } 37 38 cli := &http.Client{ 39 Timeout: 15 * time.Second, 40 } 41 42 return &PLCScraper{ 43 client: cli, 44 logger: args.Logger, 45 inserter: args.Inserter, 46 cursorFile: args.CursorFile, 47 }, nil 48} 49 50func (s *PLCScraper) Run(ctx context.Context) error { 51 startCursor, err := s.getCursor() 52 if err != nil { 53 s.logger.Error("error getting cursor", "error", err) 54 } 55 s.cursor = startCursor 56 57 ticker := time.NewTicker(3 * time.Second) 58 currTickerDuration := 3 * time.Second 59 60 setTickerDuration := func(d time.Duration) { 61 if currTickerDuration == d { 62 return 63 } 64 ticker.Reset(d) 65 currTickerDuration = d 66 } 67 68 for range ticker.C { 69 s.logger.Info("performing scrape", "cursor", s.cursor) 70 71 ustr := "https://plc.directory/export?limit=1000" 72 if s.cursor != "" { 73 ustr += "&after=" + s.cursor 74 t, _ := time.Parse(time.RFC3339Nano, s.cursor) 75 if time.Since(t) > 1*time.Hour { 76 setTickerDuration(800 * time.Millisecond) 77 } else { 78 setTickerDuration(3 * time.Second) 79 } 80 } 81 82 req, err := http.NewRequestWithContext(ctx, "GET", ustr, nil) 83 if err != nil { 84 s.logger.Error("error creating request", "error", err) 85 continue 86 } 87 88 resp, err := s.client.Do(req) 89 if err != nil { 90 s.logger.Error("error getting response", "error", err) 91 continue 92 } 93 defer resp.Body.Close() 94 95 if resp.StatusCode != http.StatusOK { 96 io.Copy(io.Discard, resp.Body) 97 s.logger.Error("export returned non-200 status", "status", resp.StatusCode) 98 continue 99 } 100 101 b, err := io.ReadAll(resp.Body) 102 if err != nil { 103 s.logger.Error("error reading response body", "error", err) 104 continue 105 } 106 107 rawEntries := strings.Split(string(b), "\n") 108 109 for i, rawEntry := range rawEntries { 110 if rawEntry == "" { 111 continue 112 } 113 114 var entry PLCEntry 115 if err := json.Unmarshal([]byte(rawEntry), &entry); err != nil { 116 s.logger.Error("error unmarshaling entry", "error", err) 117 continue 118 } 119 120 // stop inserting if context is cancelled 121 if ctx.Err() != nil { 122 ticker.Stop() 123 break 124 } 125 126 if i == len(rawEntries)-1 { 127 s.cursor = entry.CreatedAt.Format(time.RFC3339Nano) 128 // TODO: this should checking if the currently saved cursor is older than what is already saved 129 s.saveCursor(s.cursor) 130 } 131 132 chEntry, err := entry.prepareForClickhouse() 133 if err != nil { 134 s.logger.Error("error getting clickhouse entry from plc entry", "error", err) 135 continue 136 } 137 138 s.inserter.Insert(ctx, *chEntry) 139 } 140 } 141 142 return nil 143} 144 145func (s *PLCScraper) getCursor() (string, error) { 146 cursor, err := os.ReadFile(s.cursorFile) 147 if err != nil { 148 if os.IsNotExist(err) { 149 return "", nil 150 } 151 152 return "", fmt.Errorf("failed to read cursor: %w", err) 153 } 154 return string(cursor), nil 155} 156 157func (s *PLCScraper) saveCursor(cursor string) error { 158 if err := os.WriteFile(s.cursorFile, []byte(cursor), 0644); err != nil { 159 return fmt.Errorf("failed to save cursor: %w", err) 160 } 161 return nil 162}