1package photocopy
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "slices"
8 "strings"
9 "time"
10
11 "github.com/araddon/dateparse"
12 "github.com/bluesky-social/indigo/api/bsky"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/haileyok/photocopy/models"
15)
16
17func (p *Photocopy) handleCreate(ctx context.Context, recb []byte, indexedAt, rev, did, collection, rkey, cid, seq string) error {
18 iat, err := dateparse.ParseAny(indexedAt)
19 if err != nil {
20 return err
21 }
22
23 if err := p.handleCreateRecord(ctx, did, rkey, collection, cid, recb, seq); err != nil {
24 p.logger.Error("error creating record", "error", err)
25 }
26
27 switch collection {
28 case "app.bsky.feed.post":
29 return p.handleCreatePost(ctx, rev, recb, uriFromParts(did, collection, rkey), did, collection, rkey, cid, iat)
30 case "app.bsky.graph.follow":
31 return p.handleCreateFollow(ctx, recb, uriFromParts(did, collection, rkey), did, rkey, iat)
32 case "app.bsky.feed.like", "app.bsky.feed.repost":
33 return p.handleCreateInteraction(ctx, recb, uriFromParts(did, collection, rkey), did, collection, rkey, iat)
34 default:
35 return nil
36 }
37}
38
39func (p *Photocopy) handleCreateRecord(ctx context.Context, did, rkey, collection, cid string, raw []byte, seq string) error {
40 var cat time.Time
41 prkey, err := syntax.ParseTID(rkey)
42 if err == nil {
43 cat = prkey.Time()
44 } else {
45 cat = time.Now()
46 }
47
48 rec := models.Record{
49 Did: did,
50 Rkey: rkey,
51 Collection: collection,
52 Cid: cid,
53 Seq: seq,
54 Raw: string(raw),
55 CreatedAt: cat,
56 }
57
58 if err := p.inserters.recordsInserter.Insert(ctx, rec); err != nil {
59 return err
60 }
61
62 return nil
63}
64
65func (p *Photocopy) handleCreatePost(ctx context.Context, rev string, recb []byte, uri, did, collection, rkey, cid string, indexedAt time.Time) error {
66 var rec bsky.FeedPost
67 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
68 return err
69 }
70
71 cat, err := parseTimeFromRecord(rec, rkey)
72 if err != nil {
73 return err
74 }
75
76 lang := ""
77 if len(rec.Langs) != 0 {
78 lang = rec.Langs[0]
79 }
80
81 post := models.Post{
82 Uri: uri,
83 Rkey: rkey,
84 CreatedAt: *cat,
85 IndexedAt: indexedAt,
86 Did: did,
87 Lang: lang,
88 Text: rec.Text,
89 }
90
91 if rec.Reply != nil {
92 if rec.Reply.Parent != nil {
93 aturi, err := syntax.ParseATURI(rec.Reply.Parent.Uri)
94 if err != nil {
95 return fmt.Errorf("error parsing at-uri: %w", err)
96
97 }
98 post.ParentDid = aturi.Authority().String()
99 post.ParentUri = rec.Reply.Parent.Uri
100 }
101 if rec.Reply.Root != nil {
102 aturi, err := syntax.ParseATURI(rec.Reply.Root.Uri)
103 if err != nil {
104 return fmt.Errorf("error parsing at-uri: %w", err)
105
106 }
107 post.RootDid = aturi.Authority().String()
108 post.RootUri = rec.Reply.Root.Uri
109 }
110 }
111
112 if rec.Embed != nil && rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil {
113 aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecord.Record.Uri)
114 if err != nil {
115 return fmt.Errorf("error parsing at-uri: %w", err)
116
117 }
118 post.QuoteDid = aturi.Authority().String()
119 post.QuoteUri = rec.Embed.EmbedRecord.Record.Uri
120 } else if rec.Embed != nil && rec.Embed.EmbedRecordWithMedia != nil && rec.Embed.EmbedRecordWithMedia.Record != nil && rec.Embed.EmbedRecordWithMedia.Record.Record != nil {
121 aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecordWithMedia.Record.Record.Uri)
122 if err != nil {
123 return fmt.Errorf("error parsing at-uri: %w", err)
124
125 }
126 post.QuoteDid = aturi.Authority().String()
127 post.QuoteUri = rec.Embed.EmbedRecordWithMedia.Record.Record.Uri
128 }
129
130 if err := p.inserters.postsInserter.Insert(ctx, post); err != nil {
131 return err
132 }
133
134 isEn := slices.Contains(rec.Langs, "en")
135 if rec.Text != "" && rec.Reply == nil && isEn && p.nervanaClient != nil {
136 go func(ctx context.Context, rec bsky.FeedPost, did, rkey string) {
137 ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
138 defer cancel()
139
140 nervanaItems, err := p.makeNervanaRequest(ctx, rec.Text)
141 if err != nil {
142 p.logger.Error("error making nervana items request", "error", err)
143 return
144 }
145
146 for _, ni := range nervanaItems {
147 postLabel := models.PostLabel{
148 Did: did,
149 Rkey: rkey,
150 Text: ni.Text,
151 Label: ni.Label,
152 EntityId: ni.EntityId,
153 Description: ni.Description,
154 Topic: "",
155 }
156 p.inserters.labelsInserter.Insert(ctx, postLabel)
157 }
158 }(ctx, rec, did, rkey)
159 }
160
161 return nil
162}
163
164func (p *Photocopy) handleCreateFollow(ctx context.Context, recb []byte, uri, did, rkey string, indexedAt time.Time) error {
165 var rec bsky.GraphFollow
166 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
167 return err
168 }
169
170 cat, err := parseTimeFromRecord(rec, rkey)
171 if err != nil {
172 return err
173 }
174
175 follow := models.Follow{
176 Uri: uri,
177 Did: did,
178 Rkey: rkey,
179 CreatedAt: *cat,
180 IndexedAt: indexedAt,
181 Subject: rec.Subject,
182 }
183
184 if err := p.inserters.followsInserter.Insert(ctx, follow); err != nil {
185 return err
186 }
187
188 return nil
189}
190
191func (p *Photocopy) handleCreateInteraction(ctx context.Context, recb []byte, uri, did, collection, rkey string, indexedAt time.Time) error {
192 colPts := strings.Split(collection, ".")
193 if len(colPts) < 4 {
194 return fmt.Errorf("invalid collection type %s", collection)
195 }
196
197 interaction := models.Interaction{
198 Uri: uri,
199 Kind: colPts[3],
200 Rkey: rkey,
201 IndexedAt: indexedAt,
202 Did: did,
203 SubjectUri: uri,
204 SubjectDid: did,
205 }
206
207 switch collection {
208 case "app.bsky.feed.like":
209 var rec bsky.FeedLike
210 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
211 return err
212 }
213
214 cat, err := parseTimeFromRecord(rec, rkey)
215 if err != nil {
216 return err
217 }
218
219 if rec.Subject == nil {
220 return fmt.Errorf("invalid subject in like")
221 }
222
223 aturi, err := syntax.ParseATURI(rec.Subject.Uri)
224 if err != nil {
225 return fmt.Errorf("error parsing at-uri: %w", err)
226
227 }
228
229 interaction.SubjectDid = aturi.Authority().String()
230 interaction.SubjectUri = rec.Subject.Uri
231 interaction.CreatedAt = *cat
232 case "app.bsky.feed.repost":
233 var rec bsky.FeedRepost
234 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
235 return err
236 }
237
238 cat, err := parseTimeFromRecord(rec, rkey)
239 if err != nil {
240 return err
241 }
242
243 if rec.Subject == nil {
244 return fmt.Errorf("invalid subject in repost")
245 }
246
247 aturi, err := syntax.ParseATURI(rec.Subject.Uri)
248 if err != nil {
249 return fmt.Errorf("error parsing at-uri: %w", err)
250
251 }
252
253 interaction.SubjectDid = aturi.Authority().String()
254 interaction.SubjectUri = rec.Subject.Uri
255 interaction.CreatedAt = *cat
256 }
257
258 if err := p.inserters.interactionsInserter.Insert(ctx, interaction); err != nil {
259 return err
260 }
261
262 return nil
263}
264
265func parseTimeFromRecord(rec any, rkey string) (*time.Time, error) {
266 var rkeyTime time.Time
267 if rkey != "self" {
268 rt, err := syntax.ParseTID(rkey)
269 if err == nil {
270 rkeyTime = rt.Time()
271 }
272 }
273 switch rec := rec.(type) {
274 case *bsky.FeedPost:
275 t, err := dateparse.ParseAny(rec.CreatedAt)
276 if err != nil {
277 return nil, err
278 }
279
280 if inRange(t) {
281 return &t, nil
282 }
283
284 if rkeyTime.IsZero() || !inRange(rkeyTime) {
285 return timePtr(time.Now()), nil
286 }
287
288 return &rkeyTime, nil
289 case *bsky.FeedRepost:
290 t, err := dateparse.ParseAny(rec.CreatedAt)
291 if err != nil {
292 return nil, err
293 }
294
295 if inRange(t) {
296 return timePtr(t), nil
297 }
298
299 if rkeyTime.IsZero() {
300 return nil, fmt.Errorf("failed to get a useful timestamp from record")
301 }
302
303 return &rkeyTime, nil
304 case *bsky.FeedLike:
305 t, err := dateparse.ParseAny(rec.CreatedAt)
306 if err != nil {
307 return nil, err
308 }
309
310 if inRange(t) {
311 return timePtr(t), nil
312 }
313
314 if rkeyTime.IsZero() {
315 return nil, fmt.Errorf("failed to get a useful timestamp from record")
316 }
317
318 return &rkeyTime, nil
319 case *bsky.ActorProfile:
320 // We can't really trust the createdat in the profile record anyway, and its very possible its missing. just use iat for this one
321 return timePtr(time.Now()), nil
322 case *bsky.FeedGenerator:
323 if !rkeyTime.IsZero() && inRange(rkeyTime) {
324 return &rkeyTime, nil
325 }
326 return timePtr(time.Now()), nil
327 default:
328 if !rkeyTime.IsZero() && inRange(rkeyTime) {
329 return &rkeyTime, nil
330 }
331 return timePtr(time.Now()), nil
332 }
333}
334
335func inRange(t time.Time) bool {
336 now := time.Now()
337 if t.Before(now) {
338 return now.Sub(t) <= time.Hour*24*365*5
339 }
340 return t.Sub(now) <= time.Hour*24*200
341}
342
343func timePtr(t time.Time) *time.Time {
344 return &t
345}