1package photocopy
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "slices"
8 "strings"
9 "time"
10
11 "github.com/araddon/dateparse"
12 "github.com/bluesky-social/indigo/api/bsky"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/haileyok/photocopy/models"
15)
16
17func (p *Photocopy) handleCreate(ctx context.Context, recb []byte, indexedAt, rev, did, collection, rkey, cid, seq string) error {
18 iat, err := dateparse.ParseAny(indexedAt)
19 if err != nil {
20 return err
21 }
22
23 if err := p.handleCreateRecord(ctx, did, rkey, collection, cid, recb, seq); err != nil {
24 p.logger.Error("error creating record", "error", err)
25 }
26
27 switch collection {
28 case "app.bsky.feed.post":
29 return p.handleCreatePost(ctx, rev, recb, uriFromParts(did, collection, rkey), did, collection, rkey, cid, iat)
30 case "app.bsky.graph.follow":
31 return p.handleCreateFollow(ctx, recb, uriFromParts(did, collection, rkey), did, rkey, iat)
32 case "app.bsky.feed.like", "app.bsky.feed.repost":
33 return p.handleCreateInteraction(ctx, recb, uriFromParts(did, collection, rkey), did, collection, rkey, iat)
34 default:
35 return nil
36 }
37}
38
39func (p *Photocopy) handleCreateRecord(ctx context.Context, did, rkey, collection, cid string, raw []byte, seq string) error {
40 var cat time.Time
41 prkey, err := syntax.ParseTID(rkey)
42 if err == nil {
43 cat = prkey.Time()
44 } else {
45 cat = time.Now()
46 }
47
48 rec := models.Record{
49 Did: did,
50 Rkey: rkey,
51 Collection: collection,
52 Cid: cid,
53 Seq: seq,
54 Raw: string(raw),
55 CreatedAt: cat,
56 }
57
58 if err := p.inserters.recordsInserter.Insert(ctx, rec); err != nil {
59 return err
60 }
61
62 return nil
63}
64
65func (p *Photocopy) handleCreatePost(ctx context.Context, rev string, recb []byte, uri, did, collection, rkey, cid string, indexedAt time.Time) error {
66 var rec bsky.FeedPost
67 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
68 return err
69 }
70
71 cat, err := parseTimeFromRecord(rec, rkey)
72 if err != nil {
73 return err
74 }
75
76 lang := ""
77 if len(rec.Langs) != 0 {
78 lang = rec.Langs[0]
79 }
80
81 post := models.Post{
82 Uri: uri,
83 Rkey: rkey,
84 CreatedAt: *cat,
85 IndexedAt: indexedAt,
86 Did: did,
87 Lang: lang,
88 Text: rec.Text,
89 }
90
91 if rec.Reply != nil {
92 if rec.Reply.Parent != nil {
93 aturi, err := syntax.ParseATURI(rec.Reply.Parent.Uri)
94 if err != nil {
95 return fmt.Errorf("error parsing at-uri: %w", err)
96
97 }
98 post.ParentDid = aturi.Authority().String()
99 post.ParentUri = rec.Reply.Parent.Uri
100 }
101 if rec.Reply.Root != nil {
102 aturi, err := syntax.ParseATURI(rec.Reply.Root.Uri)
103 if err != nil {
104 return fmt.Errorf("error parsing at-uri: %w", err)
105
106 }
107 post.RootDid = aturi.Authority().String()
108 post.RootUri = rec.Reply.Root.Uri
109 }
110 }
111
112 if rec.Embed != nil && rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil {
113 aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecord.Record.Uri)
114 if err != nil {
115 return fmt.Errorf("error parsing at-uri: %w", err)
116
117 }
118 post.QuoteDid = aturi.Authority().String()
119 post.QuoteUri = rec.Embed.EmbedRecord.Record.Uri
120 } else if rec.Embed != nil && rec.Embed.EmbedRecordWithMedia != nil && rec.Embed.EmbedRecordWithMedia.Record != nil && rec.Embed.EmbedRecordWithMedia.Record.Record != nil {
121 aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecordWithMedia.Record.Record.Uri)
122 if err != nil {
123 return fmt.Errorf("error parsing at-uri: %w", err)
124
125 }
126 post.QuoteDid = aturi.Authority().String()
127 post.QuoteUri = rec.Embed.EmbedRecordWithMedia.Record.Record.Uri
128 }
129
130 if err := p.inserters.postsInserter.Insert(ctx, post); err != nil {
131 return err
132 }
133
134 isEn := slices.Contains(rec.Langs, "en")
135 if rec.Text != "" && rec.Reply == nil && isEn && p.nervanaClient != nil {
136 go func(ctx context.Context, rec bsky.FeedPost, did, rkey string) {
137 ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
138 defer cancel()
139
140 nervanaItems, err := p.nervanaClient.MakeRequest(ctx, rec.Text)
141 if err != nil {
142 p.logger.Error("error making nervana items request", "error", err)
143 return
144 }
145
146 for _, ni := range nervanaItems {
147 postLabel := models.PostLabel{
148 Did: did,
149 Rkey: rkey,
150 Text: ni.Text,
151 Label: ni.Label,
152 EntityId: ni.EntityId,
153 Description: ni.Description,
154 Topic: "",
155 CreatedAt: time.Now(),
156 }
157 p.inserters.labelsInserter.Insert(ctx, postLabel)
158 }
159 }(ctx, rec, did, rkey)
160 }
161
162 return nil
163}
164
165func (p *Photocopy) handleCreateFollow(ctx context.Context, recb []byte, uri, did, rkey string, indexedAt time.Time) error {
166 var rec bsky.GraphFollow
167 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
168 return err
169 }
170
171 cat, err := parseTimeFromRecord(rec, rkey)
172 if err != nil {
173 return err
174 }
175
176 follow := models.Follow{
177 Uri: uri,
178 Did: did,
179 Rkey: rkey,
180 CreatedAt: *cat,
181 IndexedAt: indexedAt,
182 Subject: rec.Subject,
183 }
184
185 if err := p.inserters.followsInserter.Insert(ctx, follow); err != nil {
186 return err
187 }
188
189 return nil
190}
191
192func (p *Photocopy) handleCreateInteraction(ctx context.Context, recb []byte, uri, did, collection, rkey string, indexedAt time.Time) error {
193 colPts := strings.Split(collection, ".")
194 if len(colPts) < 4 {
195 return fmt.Errorf("invalid collection type %s", collection)
196 }
197
198 interaction := models.Interaction{
199 Uri: uri,
200 Kind: colPts[3],
201 Rkey: rkey,
202 IndexedAt: indexedAt,
203 Did: did,
204 SubjectUri: uri,
205 SubjectDid: did,
206 }
207
208 switch collection {
209 case "app.bsky.feed.like":
210 var rec bsky.FeedLike
211 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
212 return err
213 }
214
215 cat, err := parseTimeFromRecord(rec, rkey)
216 if err != nil {
217 return err
218 }
219
220 if rec.Subject == nil {
221 return fmt.Errorf("invalid subject in like")
222 }
223
224 aturi, err := syntax.ParseATURI(rec.Subject.Uri)
225 if err != nil {
226 return fmt.Errorf("error parsing at-uri: %w", err)
227
228 }
229
230 interaction.SubjectDid = aturi.Authority().String()
231 interaction.SubjectUri = rec.Subject.Uri
232 interaction.CreatedAt = *cat
233 case "app.bsky.feed.repost":
234 var rec bsky.FeedRepost
235 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
236 return err
237 }
238
239 cat, err := parseTimeFromRecord(rec, rkey)
240 if err != nil {
241 return err
242 }
243
244 if rec.Subject == nil {
245 return fmt.Errorf("invalid subject in repost")
246 }
247
248 aturi, err := syntax.ParseATURI(rec.Subject.Uri)
249 if err != nil {
250 return fmt.Errorf("error parsing at-uri: %w", err)
251
252 }
253
254 interaction.SubjectDid = aturi.Authority().String()
255 interaction.SubjectUri = rec.Subject.Uri
256 interaction.CreatedAt = *cat
257 }
258
259 if err := p.inserters.interactionsInserter.Insert(ctx, interaction); err != nil {
260 return err
261 }
262
263 return nil
264}
265
266func parseTimeFromRecord(rec any, rkey string) (*time.Time, error) {
267 var rkeyTime time.Time
268 if rkey != "self" {
269 rt, err := syntax.ParseTID(rkey)
270 if err == nil {
271 rkeyTime = rt.Time()
272 }
273 }
274 switch rec := rec.(type) {
275 case *bsky.FeedPost:
276 t, err := dateparse.ParseAny(rec.CreatedAt)
277 if err != nil {
278 return nil, err
279 }
280
281 if inRange(t) {
282 return &t, nil
283 }
284
285 if rkeyTime.IsZero() || !inRange(rkeyTime) {
286 return timePtr(time.Now()), nil
287 }
288
289 return &rkeyTime, nil
290 case *bsky.FeedRepost:
291 t, err := dateparse.ParseAny(rec.CreatedAt)
292 if err != nil {
293 return nil, err
294 }
295
296 if inRange(t) {
297 return timePtr(t), nil
298 }
299
300 if rkeyTime.IsZero() {
301 return nil, fmt.Errorf("failed to get a useful timestamp from record")
302 }
303
304 return &rkeyTime, nil
305 case *bsky.FeedLike:
306 t, err := dateparse.ParseAny(rec.CreatedAt)
307 if err != nil {
308 return nil, err
309 }
310
311 if inRange(t) {
312 return timePtr(t), nil
313 }
314
315 if rkeyTime.IsZero() {
316 return nil, fmt.Errorf("failed to get a useful timestamp from record")
317 }
318
319 return &rkeyTime, nil
320 case *bsky.ActorProfile:
321 // We can't really trust the createdat in the profile record anyway, and its very possible its missing. just use iat for this one
322 return timePtr(time.Now()), nil
323 case *bsky.FeedGenerator:
324 if !rkeyTime.IsZero() && inRange(rkeyTime) {
325 return &rkeyTime, nil
326 }
327 return timePtr(time.Now()), nil
328 default:
329 if !rkeyTime.IsZero() && inRange(rkeyTime) {
330 return &rkeyTime, nil
331 }
332 return timePtr(time.Now()), nil
333 }
334}
335
336func inRange(t time.Time) bool {
337 now := time.Now()
338 if t.Before(now) {
339 return now.Sub(t) <= time.Hour*24*365*5
340 }
341 return t.Sub(now) <= time.Hour*24*200
342}
343
344func timePtr(t time.Time) *time.Time {
345 return &t
346}