this repo has no description
1package photocopy 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "log/slog" 9 "net/http" 10 "os" 11 "strings" 12 "time" 13 14 inserter "github.com/haileyok/photocopy/internal" 15) 16 17type PLCScraper struct { 18 client *http.Client 19 logger *slog.Logger 20 cursor string 21 cursorFile string 22 inserter *inserter.Inserter 23} 24 25type PLCScraperArgs struct { 26 Logger *slog.Logger 27 Inserter *inserter.Inserter 28 CursorFile string 29} 30 31func NewPLCScraper(ctx context.Context, args PLCScraperArgs) (*PLCScraper, error) { 32 if args.Logger == nil { 33 args.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ 34 Level: slog.LevelInfo, 35 })) 36 } 37 38 cli := &http.Client{ 39 Timeout: 15 * time.Second, 40 } 41 42 return &PLCScraper{ 43 client: cli, 44 logger: args.Logger, 45 inserter: args.Inserter, 46 cursorFile: args.CursorFile, 47 }, nil 48} 49 50func (s *PLCScraper) Run(ctx context.Context) error { 51 startCursor, err := s.getCursor() 52 if err != nil { 53 s.logger.Error("error getting cursor", "error", err) 54 } 55 s.cursor = startCursor 56 57 ticker := time.NewTicker(3 * time.Second) 58 currTickerDuration := 3 * time.Second 59 60 setTickerDuration := func(d time.Duration) { 61 if currTickerDuration == d { 62 return 63 } 64 ticker.Reset(d) 65 currTickerDuration = d 66 } 67 68 for range ticker.C { 69 s.logger.Info("performing scrape", "cursor", s.cursor) 70 71 ustr := "https://plc.directory/export?limit=1000" 72 if s.cursor != "" { 73 ustr += "&after=" + s.cursor 74 t, _ := time.Parse(time.RFC3339Nano, s.cursor) 75 if time.Since(t) > 1*time.Hour { 76 setTickerDuration(600 * time.Millisecond) 77 } else { 78 setTickerDuration(3 * time.Second) 79 } 80 } 81 82 req, err := http.NewRequestWithContext(ctx, "GET", ustr, nil) 83 if err != nil { 84 s.logger.Error("error creating request", "error", err) 85 continue 86 } 87 88 resp, err := s.client.Do(req) 89 if err != nil { 90 s.logger.Error("error getting response", "error", err) 91 continue 92 } 93 defer resp.Body.Close() 94 95 if resp.StatusCode != http.StatusOK { 96 io.Copy(io.Discard, resp.Body) 97 s.logger.Error("export returned non-200 status", "status", resp.StatusCode) 98 continue 99 } 100 101 b, err := io.ReadAll(resp.Body) 102 if err != nil { 103 s.logger.Error("error reading response body", "error", err) 104 continue 105 } 106 107 rawEntries := strings.Split(string(b), "\n") 108 109 for i, rawEntry := range rawEntries { 110 if rawEntry == "" { 111 continue 112 } 113 114 var entry PLCEntry 115 if err := json.Unmarshal([]byte(rawEntry), &entry); err != nil { 116 s.logger.Error("error unmarshaling entry", "error", err) 117 continue 118 } 119 120 // stop inserting if context is cancelled 121 if ctx.Err() != nil { 122 ticker.Stop() 123 break 124 } 125 126 if i == len(rawEntries)-1 { 127 s.cursor = entry.CreatedAt.Format(time.RFC3339Nano) 128 // TODO: this should checking if the currently saved cursor is older than what is already saved 129 s.saveCursor(s.cursor) 130 } 131 132 entry.prepareForBigQuery() 133 134 s.inserter.Insert(ctx, entry) 135 } 136 } 137 138 return nil 139} 140 141func (s *PLCScraper) getCursor() (string, error) { 142 cursor, err := os.ReadFile(s.cursorFile) 143 if err != nil { 144 if os.IsNotExist(err) { 145 return "", nil 146 } 147 148 return "", fmt.Errorf("failed to read cursor: %w", err) 149 } 150 return string(cursor), nil 151} 152 153func (s *PLCScraper) saveCursor(cursor string) error { 154 if err := os.WriteFile(s.cursorFile, []byte(cursor), 0644); err != nil { 155 return fmt.Errorf("failed to save cursor: %w", err) 156 } 157 return nil 158}