1package photocopy
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "strings"
8 "time"
9
10 "github.com/araddon/dateparse"
11 "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/bluesky-social/indigo/atproto/syntax"
13 "github.com/haileyok/photocopy/models"
14)
15
16func (p *Photocopy) handleCreate(ctx context.Context, recb []byte, indexedAt, rev, did, collection, rkey, cid, seq string) error {
17 iat, err := dateparse.ParseAny(indexedAt)
18 if err != nil {
19 return err
20 }
21
22 if err := p.handleCreateRecord(ctx, did, rkey, collection, cid, recb, seq); err != nil {
23 p.logger.Error("error creating record", "error", err)
24 }
25
26 switch collection {
27 case "app.bsky.feed.post":
28 return p.handleCreatePost(ctx, rev, recb, uriFromParts(did, collection, rkey), did, collection, rkey, cid, iat)
29 case "app.bsky.graph.follow":
30 return p.handleCreateFollow(ctx, recb, uriFromParts(did, collection, rkey), did, rkey, iat)
31 case "app.bsky.feed.like", "app.bsky.feed.repost":
32 return p.handleCreateInteraction(ctx, recb, uriFromParts(did, collection, rkey), did, collection, rkey, iat)
33 default:
34 return nil
35 }
36}
37
38func (p *Photocopy) handleCreateRecord(ctx context.Context, did, rkey, collection, cid string, raw []byte, seq string) error {
39 var cat time.Time
40 prkey, err := syntax.ParseTID(rkey)
41 if err == nil {
42 cat = prkey.Time()
43 } else {
44 cat = time.Now()
45 }
46
47 rec := models.Record{
48 Did: did,
49 Rkey: rkey,
50 Collection: collection,
51 Cid: cid,
52 Seq: seq,
53 Raw: raw,
54 CreatedAt: cat,
55 }
56
57 if err := p.inserters.recordsInserter.Insert(ctx, rec); err != nil {
58 return err
59 }
60
61 return nil
62}
63
64func (p *Photocopy) handleCreatePost(ctx context.Context, rev string, recb []byte, uri, did, collection, rkey, cid string, indexedAt time.Time) error {
65 var rec bsky.FeedPost
66 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
67 return err
68 }
69
70 cat, err := parseTimeFromRecord(rec, rkey)
71 if err != nil {
72 return err
73 }
74
75 post := models.Post{
76 Uri: uri,
77 Rkey: rkey,
78 CreatedAt: *cat,
79 IndexedAt: indexedAt,
80 Did: did,
81 }
82
83 if rec.Reply != nil {
84 if rec.Reply.Parent != nil {
85 aturi, err := syntax.ParseATURI(rec.Reply.Parent.Uri)
86 if err != nil {
87 return fmt.Errorf("error parsing at-uri: %w", err)
88
89 }
90 post.ParentDid = aturi.Authority().String()
91 post.ParentUri = rec.Reply.Parent.Uri
92 }
93 if rec.Reply.Root != nil {
94 aturi, err := syntax.ParseATURI(rec.Reply.Root.Uri)
95 if err != nil {
96 return fmt.Errorf("error parsing at-uri: %w", err)
97
98 }
99 post.RootDid = aturi.Authority().String()
100 post.RootUri = rec.Reply.Root.Uri
101 }
102 }
103
104 if rec.Embed != nil && rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil {
105 aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecord.Record.Uri)
106 if err != nil {
107 return fmt.Errorf("error parsing at-uri: %w", err)
108
109 }
110 post.QuoteDid = aturi.Authority().String()
111 post.QuoteUri = rec.Embed.EmbedRecord.Record.Uri
112 } else if rec.Embed != nil && rec.Embed.EmbedRecordWithMedia != nil && rec.Embed.EmbedRecordWithMedia.Record != nil && rec.Embed.EmbedRecordWithMedia.Record.Record != nil {
113 aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecordWithMedia.Record.Record.Uri)
114 if err != nil {
115 return fmt.Errorf("error parsing at-uri: %w", err)
116
117 }
118 post.QuoteDid = aturi.Authority().String()
119 post.QuoteUri = rec.Embed.EmbedRecordWithMedia.Record.Record.Uri
120 }
121
122 if err := p.inserters.postsInserter.Insert(ctx, post); err != nil {
123 return err
124 }
125
126 return nil
127}
128
129func (p *Photocopy) handleCreateFollow(ctx context.Context, recb []byte, uri, did, rkey string, indexedAt time.Time) error {
130 var rec bsky.GraphFollow
131 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
132 return err
133 }
134
135 cat, err := parseTimeFromRecord(rec, rkey)
136 if err != nil {
137 return err
138 }
139
140 follow := models.Follow{
141 Uri: uri,
142 Did: did,
143 Rkey: rkey,
144 CreatedAt: *cat,
145 IndexedAt: indexedAt,
146 Subject: rec.Subject,
147 }
148
149 if err := p.inserters.followsInserter.Insert(ctx, follow); err != nil {
150 return err
151 }
152
153 return nil
154}
155
156func (p *Photocopy) handleCreateInteraction(ctx context.Context, recb []byte, uri, did, collection, rkey string, indexedAt time.Time) error {
157 colPts := strings.Split(collection, ".")
158 if len(colPts) < 4 {
159 return fmt.Errorf("invalid collection type %s", collection)
160 }
161
162 interaction := models.Interaction{
163 Uri: uri,
164 Kind: colPts[3],
165 Rkey: rkey,
166 IndexedAt: indexedAt,
167 Did: did,
168 SubjectUri: uri,
169 SubjectDid: did,
170 }
171
172 switch collection {
173 case "app.bsky.feed.like":
174 var rec bsky.FeedLike
175 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
176 return err
177 }
178
179 cat, err := parseTimeFromRecord(rec, rkey)
180 if err != nil {
181 return err
182 }
183
184 if rec.Subject == nil {
185 return fmt.Errorf("invalid subject in like")
186 }
187
188 aturi, err := syntax.ParseATURI(rec.Subject.Uri)
189 if err != nil {
190 return fmt.Errorf("error parsing at-uri: %w", err)
191
192 }
193
194 interaction.SubjectDid = aturi.Authority().String()
195 interaction.SubjectUri = rec.Subject.Uri
196 interaction.CreatedAt = *cat
197 case "app.bsky.feed.repost":
198 var rec bsky.FeedRepost
199 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
200 return err
201 }
202
203 cat, err := parseTimeFromRecord(rec, rkey)
204 if err != nil {
205 return err
206 }
207
208 if rec.Subject == nil {
209 return fmt.Errorf("invalid subject in repost")
210 }
211
212 aturi, err := syntax.ParseATURI(rec.Subject.Uri)
213 if err != nil {
214 return fmt.Errorf("error parsing at-uri: %w", err)
215
216 }
217
218 interaction.SubjectDid = aturi.Authority().String()
219 interaction.SubjectUri = rec.Subject.Uri
220 interaction.CreatedAt = *cat
221 }
222
223 if err := p.inserters.interactionsInserter.Insert(ctx, interaction); err != nil {
224 return err
225 }
226
227 return nil
228}
229
230func parseTimeFromRecord(rec any, rkey string) (*time.Time, error) {
231 var rkeyTime time.Time
232 if rkey != "self" {
233 rt, err := syntax.ParseTID(rkey)
234 if err == nil {
235 rkeyTime = rt.Time()
236 }
237 }
238 switch rec := rec.(type) {
239 case *bsky.FeedPost:
240 t, err := dateparse.ParseAny(rec.CreatedAt)
241 if err != nil {
242 return nil, err
243 }
244
245 if inRange(t) {
246 return &t, nil
247 }
248
249 if rkeyTime.IsZero() || !inRange(rkeyTime) {
250 return timePtr(time.Now()), nil
251 }
252
253 return &rkeyTime, nil
254 case *bsky.FeedRepost:
255 t, err := dateparse.ParseAny(rec.CreatedAt)
256 if err != nil {
257 return nil, err
258 }
259
260 if inRange(t) {
261 return timePtr(t), nil
262 }
263
264 if rkeyTime.IsZero() {
265 return nil, fmt.Errorf("failed to get a useful timestamp from record")
266 }
267
268 return &rkeyTime, nil
269 case *bsky.FeedLike:
270 t, err := dateparse.ParseAny(rec.CreatedAt)
271 if err != nil {
272 return nil, err
273 }
274
275 if inRange(t) {
276 return timePtr(t), nil
277 }
278
279 if rkeyTime.IsZero() {
280 return nil, fmt.Errorf("failed to get a useful timestamp from record")
281 }
282
283 return &rkeyTime, nil
284 case *bsky.ActorProfile:
285 // We can't really trust the createdat in the profile record anyway, and its very possible its missing. just use iat for this one
286 return timePtr(time.Now()), nil
287 case *bsky.FeedGenerator:
288 if !rkeyTime.IsZero() && inRange(rkeyTime) {
289 return &rkeyTime, nil
290 }
291 return timePtr(time.Now()), nil
292 default:
293 if !rkeyTime.IsZero() && inRange(rkeyTime) {
294 return &rkeyTime, nil
295 }
296 return timePtr(time.Now()), nil
297 }
298}
299
300func inRange(t time.Time) bool {
301 now := time.Now()
302 if t.Before(now) {
303 return now.Sub(t) <= time.Hour*24*365*5
304 }
305 return t.Sub(now) <= time.Hour*24*200
306}
307
308func timePtr(t time.Time) *time.Time {
309 return &t
310}