this repo has no description
1package photocopy 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "slices" 8 "strings" 9 "time" 10 11 "github.com/araddon/dateparse" 12 "github.com/bluesky-social/indigo/api/bsky" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/haileyok/photocopy/models" 15) 16 17func (p *Photocopy) handleCreate(ctx context.Context, recb []byte, indexedAt, rev, did, collection, rkey, cid, seq string) error { 18 iat, err := dateparse.ParseAny(indexedAt) 19 if err != nil { 20 return err 21 } 22 23 if err := p.handleCreateRecord(ctx, did, rkey, collection, cid, recb, seq); err != nil { 24 p.logger.Error("error creating record", "error", err) 25 } 26 27 switch collection { 28 case "app.bsky.feed.post": 29 return p.handleCreatePost(ctx, rev, recb, uriFromParts(did, collection, rkey), did, collection, rkey, cid, iat) 30 case "app.bsky.graph.follow": 31 return p.handleCreateFollow(ctx, recb, uriFromParts(did, collection, rkey), did, rkey, iat) 32 case "app.bsky.feed.like", "app.bsky.feed.repost": 33 return p.handleCreateInteraction(ctx, recb, uriFromParts(did, collection, rkey), did, collection, rkey, iat) 34 default: 35 return nil 36 } 37} 38 39func (p *Photocopy) handleCreateRecord(ctx context.Context, did, rkey, collection, cid string, raw []byte, seq string) error { 40 var cat time.Time 41 prkey, err := syntax.ParseTID(rkey) 42 if err == nil { 43 cat = prkey.Time() 44 } else { 45 cat = time.Now() 46 } 47 48 rec := models.Record{ 49 Did: did, 50 Rkey: rkey, 51 Collection: collection, 52 Cid: cid, 53 Seq: seq, 54 Raw: string(raw), 55 CreatedAt: cat, 56 } 57 58 if err := p.inserters.recordsInserter.Insert(ctx, rec); err != nil { 59 return err 60 } 61 62 return nil 63} 64 65func (p *Photocopy) handleCreatePost(ctx context.Context, rev string, recb []byte, uri, did, collection, rkey, cid string, indexedAt time.Time) error { 66 var rec bsky.FeedPost 67 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 68 return err 69 } 70 71 cat, err := parseTimeFromRecord(rec, rkey) 72 if err != nil { 73 return err 74 } 75 76 lang := "" 77 if len(rec.Langs) != 0 { 78 lang = rec.Langs[0] 79 } 80 81 post := models.Post{ 82 Uri: uri, 83 Rkey: rkey, 84 CreatedAt: *cat, 85 IndexedAt: indexedAt, 86 Did: did, 87 Lang: lang, 88 Text: rec.Text, 89 } 90 91 if rec.Reply != nil { 92 if rec.Reply.Parent != nil { 93 aturi, err := syntax.ParseATURI(rec.Reply.Parent.Uri) 94 if err != nil { 95 return fmt.Errorf("error parsing at-uri: %w", err) 96 97 } 98 post.ParentDid = aturi.Authority().String() 99 post.ParentUri = rec.Reply.Parent.Uri 100 } 101 if rec.Reply.Root != nil { 102 aturi, err := syntax.ParseATURI(rec.Reply.Root.Uri) 103 if err != nil { 104 return fmt.Errorf("error parsing at-uri: %w", err) 105 106 } 107 post.RootDid = aturi.Authority().String() 108 post.RootUri = rec.Reply.Root.Uri 109 } 110 } 111 112 if rec.Embed != nil && rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil { 113 aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecord.Record.Uri) 114 if err != nil { 115 return fmt.Errorf("error parsing at-uri: %w", err) 116 117 } 118 post.QuoteDid = aturi.Authority().String() 119 post.QuoteUri = rec.Embed.EmbedRecord.Record.Uri 120 } else if rec.Embed != nil && rec.Embed.EmbedRecordWithMedia != nil && rec.Embed.EmbedRecordWithMedia.Record != nil && rec.Embed.EmbedRecordWithMedia.Record.Record != nil { 121 aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecordWithMedia.Record.Record.Uri) 122 if err != nil { 123 return fmt.Errorf("error parsing at-uri: %w", err) 124 125 } 126 post.QuoteDid = aturi.Authority().String() 127 post.QuoteUri = rec.Embed.EmbedRecordWithMedia.Record.Record.Uri 128 } 129 130 if err := p.inserters.postsInserter.Insert(ctx, post); err != nil { 131 return err 132 } 133 134 isEn := slices.Contains(rec.Langs, "en") 135 if rec.Text != "" && rec.Reply == nil && isEn && p.nervanaClient != nil { 136 go func(ctx context.Context, rec bsky.FeedPost, did, rkey string) { 137 ctx, cancel := context.WithTimeout(ctx, 5*time.Second) 138 defer cancel() 139 140 nervanaItems, err := p.makeNervanaRequest(ctx, rec.Text) 141 if err != nil { 142 p.logger.Error("error making nervana items request", "error", err) 143 return 144 } 145 146 for _, ni := range nervanaItems { 147 postLabel := models.PostLabel{ 148 Did: did, 149 Rkey: rkey, 150 Text: ni.Text, 151 Label: ni.Label, 152 EntityId: ni.EntityId, 153 Description: ni.Description, 154 Topic: "", 155 } 156 p.inserters.labelsInserter.Insert(ctx, postLabel) 157 } 158 }(ctx, rec, did, rkey) 159 } 160 161 return nil 162} 163 164func (p *Photocopy) handleCreateFollow(ctx context.Context, recb []byte, uri, did, rkey string, indexedAt time.Time) error { 165 var rec bsky.GraphFollow 166 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 167 return err 168 } 169 170 cat, err := parseTimeFromRecord(rec, rkey) 171 if err != nil { 172 return err 173 } 174 175 follow := models.Follow{ 176 Uri: uri, 177 Did: did, 178 Rkey: rkey, 179 CreatedAt: *cat, 180 IndexedAt: indexedAt, 181 Subject: rec.Subject, 182 } 183 184 if err := p.inserters.followsInserter.Insert(ctx, follow); err != nil { 185 return err 186 } 187 188 return nil 189} 190 191func (p *Photocopy) handleCreateInteraction(ctx context.Context, recb []byte, uri, did, collection, rkey string, indexedAt time.Time) error { 192 colPts := strings.Split(collection, ".") 193 if len(colPts) < 4 { 194 return fmt.Errorf("invalid collection type %s", collection) 195 } 196 197 interaction := models.Interaction{ 198 Uri: uri, 199 Kind: colPts[3], 200 Rkey: rkey, 201 IndexedAt: indexedAt, 202 Did: did, 203 SubjectUri: uri, 204 SubjectDid: did, 205 } 206 207 switch collection { 208 case "app.bsky.feed.like": 209 var rec bsky.FeedLike 210 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 211 return err 212 } 213 214 cat, err := parseTimeFromRecord(rec, rkey) 215 if err != nil { 216 return err 217 } 218 219 if rec.Subject == nil { 220 return fmt.Errorf("invalid subject in like") 221 } 222 223 aturi, err := syntax.ParseATURI(rec.Subject.Uri) 224 if err != nil { 225 return fmt.Errorf("error parsing at-uri: %w", err) 226 227 } 228 229 interaction.SubjectDid = aturi.Authority().String() 230 interaction.SubjectUri = rec.Subject.Uri 231 interaction.CreatedAt = *cat 232 case "app.bsky.feed.repost": 233 var rec bsky.FeedRepost 234 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 235 return err 236 } 237 238 cat, err := parseTimeFromRecord(rec, rkey) 239 if err != nil { 240 return err 241 } 242 243 if rec.Subject == nil { 244 return fmt.Errorf("invalid subject in repost") 245 } 246 247 aturi, err := syntax.ParseATURI(rec.Subject.Uri) 248 if err != nil { 249 return fmt.Errorf("error parsing at-uri: %w", err) 250 251 } 252 253 interaction.SubjectDid = aturi.Authority().String() 254 interaction.SubjectUri = rec.Subject.Uri 255 interaction.CreatedAt = *cat 256 } 257 258 if err := p.inserters.interactionsInserter.Insert(ctx, interaction); err != nil { 259 return err 260 } 261 262 return nil 263} 264 265func parseTimeFromRecord(rec any, rkey string) (*time.Time, error) { 266 var rkeyTime time.Time 267 if rkey != "self" { 268 rt, err := syntax.ParseTID(rkey) 269 if err == nil { 270 rkeyTime = rt.Time() 271 } 272 } 273 switch rec := rec.(type) { 274 case *bsky.FeedPost: 275 t, err := dateparse.ParseAny(rec.CreatedAt) 276 if err != nil { 277 return nil, err 278 } 279 280 if inRange(t) { 281 return &t, nil 282 } 283 284 if rkeyTime.IsZero() || !inRange(rkeyTime) { 285 return timePtr(time.Now()), nil 286 } 287 288 return &rkeyTime, nil 289 case *bsky.FeedRepost: 290 t, err := dateparse.ParseAny(rec.CreatedAt) 291 if err != nil { 292 return nil, err 293 } 294 295 if inRange(t) { 296 return timePtr(t), nil 297 } 298 299 if rkeyTime.IsZero() { 300 return nil, fmt.Errorf("failed to get a useful timestamp from record") 301 } 302 303 return &rkeyTime, nil 304 case *bsky.FeedLike: 305 t, err := dateparse.ParseAny(rec.CreatedAt) 306 if err != nil { 307 return nil, err 308 } 309 310 if inRange(t) { 311 return timePtr(t), nil 312 } 313 314 if rkeyTime.IsZero() { 315 return nil, fmt.Errorf("failed to get a useful timestamp from record") 316 } 317 318 return &rkeyTime, nil 319 case *bsky.ActorProfile: 320 // We can't really trust the createdat in the profile record anyway, and its very possible its missing. just use iat for this one 321 return timePtr(time.Now()), nil 322 case *bsky.FeedGenerator: 323 if !rkeyTime.IsZero() && inRange(rkeyTime) { 324 return &rkeyTime, nil 325 } 326 return timePtr(time.Now()), nil 327 default: 328 if !rkeyTime.IsZero() && inRange(rkeyTime) { 329 return &rkeyTime, nil 330 } 331 return timePtr(time.Now()), nil 332 } 333} 334 335func inRange(t time.Time) bool { 336 now := time.Now() 337 if t.Before(now) { 338 return now.Sub(t) <= time.Hour*24*365*5 339 } 340 return t.Sub(now) <= time.Hour*24*200 341} 342 343func timePtr(t time.Time) *time.Time { 344 return &t 345}