this repo has no description
at main 8.5 kB view raw
1package photocopy 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "slices" 8 "strings" 9 "time" 10 11 "github.com/araddon/dateparse" 12 "github.com/bluesky-social/indigo/api/bsky" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/haileyok/photocopy/models" 15) 16 17func (p *Photocopy) handleCreate(ctx context.Context, recb []byte, indexedAt, rev, did, collection, rkey, cid, seq string) error { 18 iat, err := dateparse.ParseAny(indexedAt) 19 if err != nil { 20 return err 21 } 22 23 if err := p.handleCreateRecord(ctx, did, rkey, collection, cid, recb, seq); err != nil { 24 p.logger.Error("error creating record", "error", err) 25 } 26 27 switch collection { 28 case "app.bsky.feed.post": 29 return p.handleCreatePost(ctx, rev, recb, uriFromParts(did, collection, rkey), did, collection, rkey, cid, iat) 30 case "app.bsky.graph.follow": 31 return p.handleCreateFollow(ctx, recb, uriFromParts(did, collection, rkey), did, rkey, iat) 32 case "app.bsky.feed.like", "app.bsky.feed.repost": 33 return p.handleCreateInteraction(ctx, recb, uriFromParts(did, collection, rkey), did, collection, rkey, iat) 34 default: 35 return nil 36 } 37} 38 39func (p *Photocopy) handleCreateRecord(ctx context.Context, did, rkey, collection, cid string, raw []byte, seq string) error { 40 var cat time.Time 41 prkey, err := syntax.ParseTID(rkey) 42 if err == nil { 43 cat = prkey.Time() 44 } else { 45 cat = time.Now() 46 } 47 48 rec := models.Record{ 49 Did: did, 50 Rkey: rkey, 51 Collection: collection, 52 Cid: cid, 53 Seq: seq, 54 Raw: string(raw), 55 CreatedAt: cat, 56 } 57 58 if err := p.inserters.recordsInserter.Insert(ctx, rec); err != nil { 59 return err 60 } 61 62 return nil 63} 64 65func (p *Photocopy) handleCreatePost(ctx context.Context, rev string, recb []byte, uri, did, collection, rkey, cid string, indexedAt time.Time) error { 66 var rec bsky.FeedPost 67 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 68 return err 69 } 70 71 cat, err := parseTimeFromRecord(rec, rkey) 72 if err != nil { 73 return err 74 } 75 76 lang := "" 77 if len(rec.Langs) != 0 { 78 lang = rec.Langs[0] 79 } 80 81 post := models.Post{ 82 Uri: uri, 83 Rkey: rkey, 84 CreatedAt: *cat, 85 IndexedAt: indexedAt, 86 Did: did, 87 Lang: lang, 88 Text: rec.Text, 89 } 90 91 if rec.Reply != nil { 92 if rec.Reply.Parent != nil { 93 aturi, err := syntax.ParseATURI(rec.Reply.Parent.Uri) 94 if err != nil { 95 return fmt.Errorf("error parsing at-uri: %w", err) 96 97 } 98 post.ParentDid = aturi.Authority().String() 99 post.ParentUri = rec.Reply.Parent.Uri 100 } 101 if rec.Reply.Root != nil { 102 aturi, err := syntax.ParseATURI(rec.Reply.Root.Uri) 103 if err != nil { 104 return fmt.Errorf("error parsing at-uri: %w", err) 105 106 } 107 post.RootDid = aturi.Authority().String() 108 post.RootUri = rec.Reply.Root.Uri 109 } 110 } 111 112 if rec.Embed != nil && rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil { 113 aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecord.Record.Uri) 114 if err != nil { 115 return fmt.Errorf("error parsing at-uri: %w", err) 116 117 } 118 post.QuoteDid = aturi.Authority().String() 119 post.QuoteUri = rec.Embed.EmbedRecord.Record.Uri 120 } else if rec.Embed != nil && rec.Embed.EmbedRecordWithMedia != nil && rec.Embed.EmbedRecordWithMedia.Record != nil && rec.Embed.EmbedRecordWithMedia.Record.Record != nil { 121 aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecordWithMedia.Record.Record.Uri) 122 if err != nil { 123 return fmt.Errorf("error parsing at-uri: %w", err) 124 125 } 126 post.QuoteDid = aturi.Authority().String() 127 post.QuoteUri = rec.Embed.EmbedRecordWithMedia.Record.Record.Uri 128 } 129 130 if err := p.inserters.postsInserter.Insert(ctx, post); err != nil { 131 return err 132 } 133 134 isEn := slices.Contains(rec.Langs, "en") 135 if rec.Text != "" && rec.Reply == nil && isEn && p.nervanaClient != nil { 136 go func(ctx context.Context, rec bsky.FeedPost, did, rkey string) { 137 ctx, cancel := context.WithTimeout(ctx, 5*time.Second) 138 defer cancel() 139 140 nervanaItems, err := p.nervanaClient.MakeRequest(ctx, rec.Text) 141 if err != nil { 142 p.logger.Error("error making nervana items request", "error", err) 143 return 144 } 145 146 for _, ni := range nervanaItems { 147 postLabel := models.PostLabel{ 148 Did: did, 149 Rkey: rkey, 150 Text: ni.Text, 151 Label: ni.Label, 152 EntityId: ni.EntityId, 153 Description: ni.Description, 154 Topic: "", 155 CreatedAt: time.Now(), 156 } 157 p.inserters.labelsInserter.Insert(ctx, postLabel) 158 } 159 }(ctx, rec, did, rkey) 160 } 161 162 return nil 163} 164 165func (p *Photocopy) handleCreateFollow(ctx context.Context, recb []byte, uri, did, rkey string, indexedAt time.Time) error { 166 var rec bsky.GraphFollow 167 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 168 return err 169 } 170 171 cat, err := parseTimeFromRecord(rec, rkey) 172 if err != nil { 173 return err 174 } 175 176 follow := models.Follow{ 177 Uri: uri, 178 Did: did, 179 Rkey: rkey, 180 CreatedAt: *cat, 181 IndexedAt: indexedAt, 182 Subject: rec.Subject, 183 } 184 185 if err := p.inserters.followsInserter.Insert(ctx, follow); err != nil { 186 return err 187 } 188 189 return nil 190} 191 192func (p *Photocopy) handleCreateInteraction(ctx context.Context, recb []byte, uri, did, collection, rkey string, indexedAt time.Time) error { 193 colPts := strings.Split(collection, ".") 194 if len(colPts) < 4 { 195 return fmt.Errorf("invalid collection type %s", collection) 196 } 197 198 interaction := models.Interaction{ 199 Uri: uri, 200 Kind: colPts[3], 201 Rkey: rkey, 202 IndexedAt: indexedAt, 203 Did: did, 204 SubjectUri: uri, 205 SubjectDid: did, 206 } 207 208 switch collection { 209 case "app.bsky.feed.like": 210 var rec bsky.FeedLike 211 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 212 return err 213 } 214 215 cat, err := parseTimeFromRecord(rec, rkey) 216 if err != nil { 217 return err 218 } 219 220 if rec.Subject == nil { 221 return fmt.Errorf("invalid subject in like") 222 } 223 224 aturi, err := syntax.ParseATURI(rec.Subject.Uri) 225 if err != nil { 226 return fmt.Errorf("error parsing at-uri: %w", err) 227 228 } 229 230 interaction.SubjectDid = aturi.Authority().String() 231 interaction.SubjectUri = rec.Subject.Uri 232 interaction.CreatedAt = *cat 233 case "app.bsky.feed.repost": 234 var rec bsky.FeedRepost 235 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 236 return err 237 } 238 239 cat, err := parseTimeFromRecord(rec, rkey) 240 if err != nil { 241 return err 242 } 243 244 if rec.Subject == nil { 245 return fmt.Errorf("invalid subject in repost") 246 } 247 248 aturi, err := syntax.ParseATURI(rec.Subject.Uri) 249 if err != nil { 250 return fmt.Errorf("error parsing at-uri: %w", err) 251 252 } 253 254 interaction.SubjectDid = aturi.Authority().String() 255 interaction.SubjectUri = rec.Subject.Uri 256 interaction.CreatedAt = *cat 257 } 258 259 if err := p.inserters.interactionsInserter.Insert(ctx, interaction); err != nil { 260 return err 261 } 262 263 return nil 264} 265 266func parseTimeFromRecord(rec any, rkey string) (*time.Time, error) { 267 var rkeyTime time.Time 268 if rkey != "self" { 269 rt, err := syntax.ParseTID(rkey) 270 if err == nil { 271 rkeyTime = rt.Time() 272 } 273 } 274 switch rec := rec.(type) { 275 case *bsky.FeedPost: 276 t, err := dateparse.ParseAny(rec.CreatedAt) 277 if err != nil { 278 return nil, err 279 } 280 281 if inRange(t) { 282 return &t, nil 283 } 284 285 if rkeyTime.IsZero() || !inRange(rkeyTime) { 286 return timePtr(time.Now()), nil 287 } 288 289 return &rkeyTime, nil 290 case *bsky.FeedRepost: 291 t, err := dateparse.ParseAny(rec.CreatedAt) 292 if err != nil { 293 return nil, err 294 } 295 296 if inRange(t) { 297 return timePtr(t), nil 298 } 299 300 if rkeyTime.IsZero() { 301 return nil, fmt.Errorf("failed to get a useful timestamp from record") 302 } 303 304 return &rkeyTime, nil 305 case *bsky.FeedLike: 306 t, err := dateparse.ParseAny(rec.CreatedAt) 307 if err != nil { 308 return nil, err 309 } 310 311 if inRange(t) { 312 return timePtr(t), nil 313 } 314 315 if rkeyTime.IsZero() { 316 return nil, fmt.Errorf("failed to get a useful timestamp from record") 317 } 318 319 return &rkeyTime, nil 320 case *bsky.ActorProfile: 321 // We can't really trust the createdat in the profile record anyway, and its very possible its missing. just use iat for this one 322 return timePtr(time.Now()), nil 323 case *bsky.FeedGenerator: 324 if !rkeyTime.IsZero() && inRange(rkeyTime) { 325 return &rkeyTime, nil 326 } 327 return timePtr(time.Now()), nil 328 default: 329 if !rkeyTime.IsZero() && inRange(rkeyTime) { 330 return &rkeyTime, nil 331 } 332 return timePtr(time.Now()), nil 333 } 334} 335 336func inRange(t time.Time) bool { 337 now := time.Now() 338 if t.Before(now) { 339 return now.Sub(t) <= time.Hour*24*365*5 340 } 341 return t.Sub(now) <= time.Hour*24*200 342} 343 344func timePtr(t time.Time) *time.Time { 345 return &t 346}