1package photocopy
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "log/slog"
9 "net/http"
10 "os"
11 "strings"
12 "time"
13
14 "github.com/haileyok/photocopy/clickhouse_inserter"
15)
16
17type PLCScraper struct {
18 client *http.Client
19 logger *slog.Logger
20 cursor string
21 cursorFile string
22 inserter *clickhouse_inserter.Inserter
23}
24
25type PLCScraperArgs struct {
26 Logger *slog.Logger
27 Inserter *clickhouse_inserter.Inserter
28 CursorFile string
29}
30
31func NewPLCScraper(ctx context.Context, args PLCScraperArgs) (*PLCScraper, error) {
32 if args.Logger == nil {
33 args.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
34 Level: slog.LevelInfo,
35 }))
36 }
37
38 cli := &http.Client{
39 Timeout: 15 * time.Second,
40 }
41
42 return &PLCScraper{
43 client: cli,
44 logger: args.Logger,
45 inserter: args.Inserter,
46 cursorFile: args.CursorFile,
47 }, nil
48}
49
50func (s *PLCScraper) Run(ctx context.Context) error {
51 startCursor, err := s.getCursor()
52 if err != nil {
53 s.logger.Error("error getting cursor", "error", err)
54 }
55 s.cursor = startCursor
56
57 ticker := time.NewTicker(3 * time.Second)
58 currTickerDuration := 3 * time.Second
59
60 setTickerDuration := func(d time.Duration) {
61 if currTickerDuration == d {
62 return
63 }
64 ticker.Reset(d)
65 currTickerDuration = d
66 }
67
68 for range ticker.C {
69 s.logger.Info("performing scrape", "cursor", s.cursor)
70
71 ustr := "https://plc.directory/export?limit=1000"
72 if s.cursor != "" {
73 ustr += "&after=" + s.cursor
74 t, _ := time.Parse(time.RFC3339Nano, s.cursor)
75 if time.Since(t) > 1*time.Hour {
76 setTickerDuration(800 * time.Millisecond)
77 } else {
78 setTickerDuration(3 * time.Second)
79 }
80 }
81
82 req, err := http.NewRequestWithContext(ctx, "GET", ustr, nil)
83 if err != nil {
84 s.logger.Error("error creating request", "error", err)
85 continue
86 }
87
88 resp, err := s.client.Do(req)
89 if err != nil {
90 s.logger.Error("error getting response", "error", err)
91 continue
92 }
93 defer resp.Body.Close()
94
95 if resp.StatusCode != http.StatusOK {
96 io.Copy(io.Discard, resp.Body)
97 s.logger.Error("export returned non-200 status", "status", resp.StatusCode)
98 continue
99 }
100
101 b, err := io.ReadAll(resp.Body)
102 if err != nil {
103 s.logger.Error("error reading response body", "error", err)
104 continue
105 }
106
107 rawEntries := strings.Split(string(b), "\n")
108
109 for i, rawEntry := range rawEntries {
110 if rawEntry == "" {
111 continue
112 }
113
114 var entry PLCEntry
115 if err := json.Unmarshal([]byte(rawEntry), &entry); err != nil {
116 s.logger.Error("error unmarshaling entry", "error", err)
117 continue
118 }
119
120 // stop inserting if context is cancelled
121 if ctx.Err() != nil {
122 ticker.Stop()
123 break
124 }
125
126 if i == len(rawEntries)-1 {
127 s.cursor = entry.CreatedAt.Format(time.RFC3339Nano)
128 // TODO: this should checking if the currently saved cursor is older than what is already saved
129 s.saveCursor(s.cursor)
130 }
131
132 chEntry, err := entry.prepareForClickhouse()
133 if err != nil {
134 s.logger.Error("error getting clickhouse entry from plc entry", "error", err)
135 continue
136 }
137
138 s.inserter.Insert(ctx, *chEntry)
139 }
140 }
141
142 return nil
143}
144
145func (s *PLCScraper) getCursor() (string, error) {
146 cursor, err := os.ReadFile(s.cursorFile)
147 if err != nil {
148 if os.IsNotExist(err) {
149 return "", nil
150 }
151
152 return "", fmt.Errorf("failed to read cursor: %w", err)
153 }
154 return string(cursor), nil
155}
156
157func (s *PLCScraper) saveCursor(cursor string) error {
158 if err := os.WriteFile(s.cursorFile, []byte(cursor), 0644); err != nil {
159 return fmt.Errorf("failed to save cursor: %w", err)
160 }
161 return nil
162}