1package photocopy
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "strings"
8 "time"
9
10 "github.com/araddon/dateparse"
11 "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/bluesky-social/indigo/atproto/syntax"
13 "github.com/haileyok/photocopy/models"
14)
15
16func (p *Photocopy) handleCreate(ctx context.Context, recb []byte, indexedAt, rev, did, collection, rkey, cid, seq string) error {
17 iat, err := dateparse.ParseAny(indexedAt)
18 if err != nil {
19 return err
20 }
21
22 if err := p.handleCreateRecord(ctx, did, rkey, collection, cid, recb, seq); err != nil {
23 p.logger.Error("error creating record", "error", err)
24 }
25
26 switch collection {
27 case "app.bsky.feed.post":
28 return p.handleCreatePost(ctx, rev, recb, uriFromParts(did, collection, rkey), did, collection, rkey, cid, iat)
29 case "app.bsky.graph.follow":
30 return p.handleCreateFollow(ctx, recb, uriFromParts(did, collection, rkey), did, rkey, iat)
31 case "app.bsky.feed.like", "app.bsky.feed.repost":
32 return p.handleCreateInteraction(ctx, recb, uriFromParts(did, collection, rkey), did, collection, rkey, iat)
33 default:
34 return nil
35 }
36}
37
38func (p *Photocopy) handleCreateRecord(ctx context.Context, did, rkey, collection, cid string, raw []byte, seq string) error {
39 var cat time.Time
40 prkey, err := syntax.ParseTID(rkey)
41 if err == nil {
42 cat = prkey.Time()
43 } else {
44 cat = time.Now()
45 }
46
47 rec := models.Record{
48 Did: did,
49 Rkey: rkey,
50 Collection: collection,
51 Cid: cid,
52 Seq: seq,
53 Raw: string(raw),
54 CreatedAt: cat,
55 }
56
57 if err := p.inserters.recordsInserter.Insert(ctx, rec); err != nil {
58 return err
59 }
60
61 return nil
62}
63
64func (p *Photocopy) handleCreatePost(ctx context.Context, rev string, recb []byte, uri, did, collection, rkey, cid string, indexedAt time.Time) error {
65 var rec bsky.FeedPost
66 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
67 return err
68 }
69
70 cat, err := parseTimeFromRecord(rec, rkey)
71 if err != nil {
72 return err
73 }
74
75 lang := ""
76 if len(rec.Langs) != 0 {
77 lang = rec.Langs[0]
78 }
79
80 post := models.Post{
81 Uri: uri,
82 Rkey: rkey,
83 CreatedAt: *cat,
84 IndexedAt: indexedAt,
85 Did: did,
86 Lang: lang,
87 }
88
89 if rec.Reply != nil {
90 if rec.Reply.Parent != nil {
91 aturi, err := syntax.ParseATURI(rec.Reply.Parent.Uri)
92 if err != nil {
93 return fmt.Errorf("error parsing at-uri: %w", err)
94
95 }
96 post.ParentDid = aturi.Authority().String()
97 post.ParentUri = rec.Reply.Parent.Uri
98 }
99 if rec.Reply.Root != nil {
100 aturi, err := syntax.ParseATURI(rec.Reply.Root.Uri)
101 if err != nil {
102 return fmt.Errorf("error parsing at-uri: %w", err)
103
104 }
105 post.RootDid = aturi.Authority().String()
106 post.RootUri = rec.Reply.Root.Uri
107 }
108 }
109
110 if rec.Embed != nil && rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil {
111 aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecord.Record.Uri)
112 if err != nil {
113 return fmt.Errorf("error parsing at-uri: %w", err)
114
115 }
116 post.QuoteDid = aturi.Authority().String()
117 post.QuoteUri = rec.Embed.EmbedRecord.Record.Uri
118 } else if rec.Embed != nil && rec.Embed.EmbedRecordWithMedia != nil && rec.Embed.EmbedRecordWithMedia.Record != nil && rec.Embed.EmbedRecordWithMedia.Record.Record != nil {
119 aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecordWithMedia.Record.Record.Uri)
120 if err != nil {
121 return fmt.Errorf("error parsing at-uri: %w", err)
122
123 }
124 post.QuoteDid = aturi.Authority().String()
125 post.QuoteUri = rec.Embed.EmbedRecordWithMedia.Record.Record.Uri
126 }
127
128 if err := p.inserters.postsInserter.Insert(ctx, post); err != nil {
129 return err
130 }
131
132 return nil
133}
134
135func (p *Photocopy) handleCreateFollow(ctx context.Context, recb []byte, uri, did, rkey string, indexedAt time.Time) error {
136 var rec bsky.GraphFollow
137 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
138 return err
139 }
140
141 cat, err := parseTimeFromRecord(rec, rkey)
142 if err != nil {
143 return err
144 }
145
146 follow := models.Follow{
147 Uri: uri,
148 Did: did,
149 Rkey: rkey,
150 CreatedAt: *cat,
151 IndexedAt: indexedAt,
152 Subject: rec.Subject,
153 }
154
155 if err := p.inserters.followsInserter.Insert(ctx, follow); err != nil {
156 return err
157 }
158
159 return nil
160}
161
162func (p *Photocopy) handleCreateInteraction(ctx context.Context, recb []byte, uri, did, collection, rkey string, indexedAt time.Time) error {
163 colPts := strings.Split(collection, ".")
164 if len(colPts) < 4 {
165 return fmt.Errorf("invalid collection type %s", collection)
166 }
167
168 interaction := models.Interaction{
169 Uri: uri,
170 Kind: colPts[3],
171 Rkey: rkey,
172 IndexedAt: indexedAt,
173 Did: did,
174 SubjectUri: uri,
175 SubjectDid: did,
176 }
177
178 switch collection {
179 case "app.bsky.feed.like":
180 var rec bsky.FeedLike
181 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
182 return err
183 }
184
185 cat, err := parseTimeFromRecord(rec, rkey)
186 if err != nil {
187 return err
188 }
189
190 if rec.Subject == nil {
191 return fmt.Errorf("invalid subject in like")
192 }
193
194 aturi, err := syntax.ParseATURI(rec.Subject.Uri)
195 if err != nil {
196 return fmt.Errorf("error parsing at-uri: %w", err)
197
198 }
199
200 interaction.SubjectDid = aturi.Authority().String()
201 interaction.SubjectUri = rec.Subject.Uri
202 interaction.CreatedAt = *cat
203 case "app.bsky.feed.repost":
204 var rec bsky.FeedRepost
205 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
206 return err
207 }
208
209 cat, err := parseTimeFromRecord(rec, rkey)
210 if err != nil {
211 return err
212 }
213
214 if rec.Subject == nil {
215 return fmt.Errorf("invalid subject in repost")
216 }
217
218 aturi, err := syntax.ParseATURI(rec.Subject.Uri)
219 if err != nil {
220 return fmt.Errorf("error parsing at-uri: %w", err)
221
222 }
223
224 interaction.SubjectDid = aturi.Authority().String()
225 interaction.SubjectUri = rec.Subject.Uri
226 interaction.CreatedAt = *cat
227 }
228
229 if err := p.inserters.interactionsInserter.Insert(ctx, interaction); err != nil {
230 return err
231 }
232
233 return nil
234}
235
236func parseTimeFromRecord(rec any, rkey string) (*time.Time, error) {
237 var rkeyTime time.Time
238 if rkey != "self" {
239 rt, err := syntax.ParseTID(rkey)
240 if err == nil {
241 rkeyTime = rt.Time()
242 }
243 }
244 switch rec := rec.(type) {
245 case *bsky.FeedPost:
246 t, err := dateparse.ParseAny(rec.CreatedAt)
247 if err != nil {
248 return nil, err
249 }
250
251 if inRange(t) {
252 return &t, nil
253 }
254
255 if rkeyTime.IsZero() || !inRange(rkeyTime) {
256 return timePtr(time.Now()), nil
257 }
258
259 return &rkeyTime, nil
260 case *bsky.FeedRepost:
261 t, err := dateparse.ParseAny(rec.CreatedAt)
262 if err != nil {
263 return nil, err
264 }
265
266 if inRange(t) {
267 return timePtr(t), nil
268 }
269
270 if rkeyTime.IsZero() {
271 return nil, fmt.Errorf("failed to get a useful timestamp from record")
272 }
273
274 return &rkeyTime, nil
275 case *bsky.FeedLike:
276 t, err := dateparse.ParseAny(rec.CreatedAt)
277 if err != nil {
278 return nil, err
279 }
280
281 if inRange(t) {
282 return timePtr(t), nil
283 }
284
285 if rkeyTime.IsZero() {
286 return nil, fmt.Errorf("failed to get a useful timestamp from record")
287 }
288
289 return &rkeyTime, nil
290 case *bsky.ActorProfile:
291 // We can't really trust the createdat in the profile record anyway, and its very possible its missing. just use iat for this one
292 return timePtr(time.Now()), nil
293 case *bsky.FeedGenerator:
294 if !rkeyTime.IsZero() && inRange(rkeyTime) {
295 return &rkeyTime, nil
296 }
297 return timePtr(time.Now()), nil
298 default:
299 if !rkeyTime.IsZero() && inRange(rkeyTime) {
300 return &rkeyTime, nil
301 }
302 return timePtr(time.Now()), nil
303 }
304}
305
306func inRange(t time.Time) bool {
307 now := time.Now()
308 if t.Before(now) {
309 return now.Sub(t) <= time.Hour*24*365*5
310 }
311 return t.Sub(now) <= time.Hour*24*200
312}
313
314func timePtr(t time.Time) *time.Time {
315 return &t
316}