1package photocopy
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "strings"
8 "time"
9
10 "github.com/araddon/dateparse"
11 "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/bluesky-social/indigo/atproto/syntax"
13 "github.com/haileyok/photocopy/models"
14)
15
16func (p *Photocopy) handleCreate(ctx context.Context, recb []byte, indexedAt, rev, did, collection, rkey, cid, seq string) error {
17 iat, err := dateparse.ParseAny(indexedAt)
18 if err != nil {
19 return err
20 }
21
22 if err := p.handleCreateRecord(ctx, did, rkey, collection, cid, recb, seq); err != nil {
23 p.logger.Error("error creating record", "error", err)
24 }
25
26 switch collection {
27 case "app.bsky.feed.post":
28 return p.handleCreatePost(ctx, rev, recb, uriFromParts(did, collection, rkey), did, collection, rkey, cid, iat)
29 case "app.bsky.graph.follow":
30 return p.handleCreateFollow(ctx, recb, uriFromParts(did, collection, rkey), did, rkey, iat)
31 case "app.bsky.feed.like", "app.bsky.feed.repost":
32 return p.handleCreateInteraction(ctx, recb, uriFromParts(did, collection, rkey), did, collection, rkey, iat)
33 default:
34 return nil
35 }
36}
37
38func (p *Photocopy) handleCreateRecord(ctx context.Context, did, rkey, collection, cid string, raw []byte, seq string) error {
39 var cat time.Time
40 prkey, err := syntax.ParseTID(rkey)
41 if err == nil {
42 cat = prkey.Time()
43 } else {
44 cat = time.Now()
45 }
46
47 rec := models.Record{
48 Did: did,
49 Rkey: rkey,
50 Collection: collection,
51 Cid: cid,
52 Seq: seq,
53 Raw: string(raw),
54 CreatedAt: cat,
55 }
56
57 if err := p.inserters.recordsInserter.Insert(ctx, rec); err != nil {
58 return err
59 }
60
61 return nil
62}
63
64func (p *Photocopy) handleCreatePost(ctx context.Context, rev string, recb []byte, uri, did, collection, rkey, cid string, indexedAt time.Time) error {
65 var rec bsky.FeedPost
66 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
67 return err
68 }
69
70 cat, err := parseTimeFromRecord(rec, rkey)
71 if err != nil {
72 return err
73 }
74
75 lang := ""
76 if len(rec.Langs) != 0 {
77 lang = rec.Langs[0]
78 }
79
80 post := models.Post{
81 Uri: uri,
82 Rkey: rkey,
83 CreatedAt: *cat,
84 IndexedAt: indexedAt,
85 Did: did,
86 Lang: lang,
87 Text: rec.Text,
88 }
89
90 if rec.Reply != nil {
91 if rec.Reply.Parent != nil {
92 aturi, err := syntax.ParseATURI(rec.Reply.Parent.Uri)
93 if err != nil {
94 return fmt.Errorf("error parsing at-uri: %w", err)
95
96 }
97 post.ParentDid = aturi.Authority().String()
98 post.ParentUri = rec.Reply.Parent.Uri
99 }
100 if rec.Reply.Root != nil {
101 aturi, err := syntax.ParseATURI(rec.Reply.Root.Uri)
102 if err != nil {
103 return fmt.Errorf("error parsing at-uri: %w", err)
104
105 }
106 post.RootDid = aturi.Authority().String()
107 post.RootUri = rec.Reply.Root.Uri
108 }
109 }
110
111 if rec.Embed != nil && rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil {
112 aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecord.Record.Uri)
113 if err != nil {
114 return fmt.Errorf("error parsing at-uri: %w", err)
115
116 }
117 post.QuoteDid = aturi.Authority().String()
118 post.QuoteUri = rec.Embed.EmbedRecord.Record.Uri
119 } else if rec.Embed != nil && rec.Embed.EmbedRecordWithMedia != nil && rec.Embed.EmbedRecordWithMedia.Record != nil && rec.Embed.EmbedRecordWithMedia.Record.Record != nil {
120 aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecordWithMedia.Record.Record.Uri)
121 if err != nil {
122 return fmt.Errorf("error parsing at-uri: %w", err)
123
124 }
125 post.QuoteDid = aturi.Authority().String()
126 post.QuoteUri = rec.Embed.EmbedRecordWithMedia.Record.Record.Uri
127 }
128
129 if err := p.inserters.postsInserter.Insert(ctx, post); err != nil {
130 return err
131 }
132
133 if rec.Text != "" {
134 go func(ctx context.Context, rec bsky.FeedPost, did, rkey string) {
135 ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
136 defer cancel()
137
138 nervanaItems, err := p.makeNervanaRequest(ctx, rec.Text)
139 if err != nil {
140 p.logger.Error("error making nervana items request", "error", err)
141 return
142 }
143
144 for _, ni := range nervanaItems {
145 postLabel := models.PostLabel{
146 Did: did,
147 Rkey: rkey,
148 Text: ni.Text,
149 Label: ni.Label,
150 EntityId: ni.EntityId,
151 Description: ni.Description,
152 Topic: "",
153 }
154 p.inserters.labelsInserter.Insert(ctx, postLabel)
155 }
156 }(ctx, rec, did, rkey)
157 }
158
159 return nil
160}
161
162func (p *Photocopy) handleCreateFollow(ctx context.Context, recb []byte, uri, did, rkey string, indexedAt time.Time) error {
163 var rec bsky.GraphFollow
164 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
165 return err
166 }
167
168 cat, err := parseTimeFromRecord(rec, rkey)
169 if err != nil {
170 return err
171 }
172
173 follow := models.Follow{
174 Uri: uri,
175 Did: did,
176 Rkey: rkey,
177 CreatedAt: *cat,
178 IndexedAt: indexedAt,
179 Subject: rec.Subject,
180 }
181
182 if err := p.inserters.followsInserter.Insert(ctx, follow); err != nil {
183 return err
184 }
185
186 return nil
187}
188
189func (p *Photocopy) handleCreateInteraction(ctx context.Context, recb []byte, uri, did, collection, rkey string, indexedAt time.Time) error {
190 colPts := strings.Split(collection, ".")
191 if len(colPts) < 4 {
192 return fmt.Errorf("invalid collection type %s", collection)
193 }
194
195 interaction := models.Interaction{
196 Uri: uri,
197 Kind: colPts[3],
198 Rkey: rkey,
199 IndexedAt: indexedAt,
200 Did: did,
201 SubjectUri: uri,
202 SubjectDid: did,
203 }
204
205 switch collection {
206 case "app.bsky.feed.like":
207 var rec bsky.FeedLike
208 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
209 return err
210 }
211
212 cat, err := parseTimeFromRecord(rec, rkey)
213 if err != nil {
214 return err
215 }
216
217 if rec.Subject == nil {
218 return fmt.Errorf("invalid subject in like")
219 }
220
221 aturi, err := syntax.ParseATURI(rec.Subject.Uri)
222 if err != nil {
223 return fmt.Errorf("error parsing at-uri: %w", err)
224
225 }
226
227 interaction.SubjectDid = aturi.Authority().String()
228 interaction.SubjectUri = rec.Subject.Uri
229 interaction.CreatedAt = *cat
230 case "app.bsky.feed.repost":
231 var rec bsky.FeedRepost
232 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
233 return err
234 }
235
236 cat, err := parseTimeFromRecord(rec, rkey)
237 if err != nil {
238 return err
239 }
240
241 if rec.Subject == nil {
242 return fmt.Errorf("invalid subject in repost")
243 }
244
245 aturi, err := syntax.ParseATURI(rec.Subject.Uri)
246 if err != nil {
247 return fmt.Errorf("error parsing at-uri: %w", err)
248
249 }
250
251 interaction.SubjectDid = aturi.Authority().String()
252 interaction.SubjectUri = rec.Subject.Uri
253 interaction.CreatedAt = *cat
254 }
255
256 if err := p.inserters.interactionsInserter.Insert(ctx, interaction); err != nil {
257 return err
258 }
259
260 return nil
261}
262
263func parseTimeFromRecord(rec any, rkey string) (*time.Time, error) {
264 var rkeyTime time.Time
265 if rkey != "self" {
266 rt, err := syntax.ParseTID(rkey)
267 if err == nil {
268 rkeyTime = rt.Time()
269 }
270 }
271 switch rec := rec.(type) {
272 case *bsky.FeedPost:
273 t, err := dateparse.ParseAny(rec.CreatedAt)
274 if err != nil {
275 return nil, err
276 }
277
278 if inRange(t) {
279 return &t, nil
280 }
281
282 if rkeyTime.IsZero() || !inRange(rkeyTime) {
283 return timePtr(time.Now()), nil
284 }
285
286 return &rkeyTime, nil
287 case *bsky.FeedRepost:
288 t, err := dateparse.ParseAny(rec.CreatedAt)
289 if err != nil {
290 return nil, err
291 }
292
293 if inRange(t) {
294 return timePtr(t), nil
295 }
296
297 if rkeyTime.IsZero() {
298 return nil, fmt.Errorf("failed to get a useful timestamp from record")
299 }
300
301 return &rkeyTime, nil
302 case *bsky.FeedLike:
303 t, err := dateparse.ParseAny(rec.CreatedAt)
304 if err != nil {
305 return nil, err
306 }
307
308 if inRange(t) {
309 return timePtr(t), nil
310 }
311
312 if rkeyTime.IsZero() {
313 return nil, fmt.Errorf("failed to get a useful timestamp from record")
314 }
315
316 return &rkeyTime, nil
317 case *bsky.ActorProfile:
318 // We can't really trust the createdat in the profile record anyway, and its very possible its missing. just use iat for this one
319 return timePtr(time.Now()), nil
320 case *bsky.FeedGenerator:
321 if !rkeyTime.IsZero() && inRange(rkeyTime) {
322 return &rkeyTime, nil
323 }
324 return timePtr(time.Now()), nil
325 default:
326 if !rkeyTime.IsZero() && inRange(rkeyTime) {
327 return &rkeyTime, nil
328 }
329 return timePtr(time.Now()), nil
330 }
331}
332
333func inRange(t time.Time) bool {
334 now := time.Now()
335 if t.Before(now) {
336 return now.Sub(t) <= time.Hour*24*365*5
337 }
338 return t.Sub(now) <= time.Hour*24*200
339}
340
341func timePtr(t time.Time) *time.Time {
342 return &t
343}