this repo has no description
1package photocopy 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "strings" 8 "time" 9 10 "github.com/araddon/dateparse" 11 "github.com/bluesky-social/indigo/api/bsky" 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "github.com/haileyok/photocopy/models" 14) 15 16func (p *Photocopy) handleCreate(ctx context.Context, recb []byte, indexedAt, rev, did, collection, rkey, cid, seq string) error { 17 iat, err := dateparse.ParseAny(indexedAt) 18 if err != nil { 19 return err 20 } 21 22 if err := p.handleCreateRecord(ctx, did, rkey, collection, cid, recb, seq); err != nil { 23 p.logger.Error("error creating record", "error", err) 24 } 25 26 switch collection { 27 case "app.bsky.feed.post": 28 return p.handleCreatePost(ctx, rev, recb, uriFromParts(did, collection, rkey), did, collection, rkey, cid, iat) 29 case "app.bsky.graph.follow": 30 return p.handleCreateFollow(ctx, recb, uriFromParts(did, collection, rkey), did, rkey, iat) 31 case "app.bsky.feed.like", "app.bsky.feed.repost": 32 return p.handleCreateInteraction(ctx, recb, uriFromParts(did, collection, rkey), did, collection, rkey, iat) 33 default: 34 return nil 35 } 36} 37 38func (p *Photocopy) handleCreateRecord(ctx context.Context, did, rkey, collection, cid string, raw []byte, seq string) error { 39 var cat time.Time 40 prkey, err := syntax.ParseTID(rkey) 41 if err == nil { 42 cat = prkey.Time() 43 } else { 44 cat = time.Now() 45 } 46 47 rec := models.Record{ 48 Did: did, 49 Rkey: rkey, 50 Collection: collection, 51 Cid: cid, 52 Seq: seq, 53 Raw: string(raw), 54 CreatedAt: cat, 55 } 56 57 if err := p.inserters.recordsInserter.Insert(ctx, rec); err != nil { 58 return err 59 } 60 61 return nil 62} 63 64func (p *Photocopy) handleCreatePost(ctx context.Context, rev string, recb []byte, uri, did, collection, rkey, cid string, indexedAt time.Time) error { 65 var rec bsky.FeedPost 66 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 67 return err 68 } 69 70 cat, err := parseTimeFromRecord(rec, rkey) 71 if err != nil { 72 return err 73 } 74 75 lang := "" 76 if len(rec.Langs) != 0 { 77 lang = rec.Langs[0] 78 } 79 80 post := models.Post{ 81 Uri: uri, 82 Rkey: rkey, 83 CreatedAt: *cat, 84 IndexedAt: indexedAt, 85 Did: did, 86 Lang: lang, 87 Text: rec.Text, 88 } 89 90 if rec.Reply != nil { 91 if rec.Reply.Parent != nil { 92 aturi, err := syntax.ParseATURI(rec.Reply.Parent.Uri) 93 if err != nil { 94 return fmt.Errorf("error parsing at-uri: %w", err) 95 96 } 97 post.ParentDid = aturi.Authority().String() 98 post.ParentUri = rec.Reply.Parent.Uri 99 } 100 if rec.Reply.Root != nil { 101 aturi, err := syntax.ParseATURI(rec.Reply.Root.Uri) 102 if err != nil { 103 return fmt.Errorf("error parsing at-uri: %w", err) 104 105 } 106 post.RootDid = aturi.Authority().String() 107 post.RootUri = rec.Reply.Root.Uri 108 } 109 } 110 111 if rec.Embed != nil && rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil { 112 aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecord.Record.Uri) 113 if err != nil { 114 return fmt.Errorf("error parsing at-uri: %w", err) 115 116 } 117 post.QuoteDid = aturi.Authority().String() 118 post.QuoteUri = rec.Embed.EmbedRecord.Record.Uri 119 } else if rec.Embed != nil && rec.Embed.EmbedRecordWithMedia != nil && rec.Embed.EmbedRecordWithMedia.Record != nil && rec.Embed.EmbedRecordWithMedia.Record.Record != nil { 120 aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecordWithMedia.Record.Record.Uri) 121 if err != nil { 122 return fmt.Errorf("error parsing at-uri: %w", err) 123 124 } 125 post.QuoteDid = aturi.Authority().String() 126 post.QuoteUri = rec.Embed.EmbedRecordWithMedia.Record.Record.Uri 127 } 128 129 if err := p.inserters.postsInserter.Insert(ctx, post); err != nil { 130 return err 131 } 132 133 if rec.Text != "" && p.nervanaClient != nil { 134 go func(ctx context.Context, rec bsky.FeedPost, did, rkey string) { 135 ctx, cancel := context.WithTimeout(ctx, 5*time.Second) 136 defer cancel() 137 138 nervanaItems, err := p.makeNervanaRequest(ctx, rec.Text) 139 if err != nil { 140 p.logger.Error("error making nervana items request", "error", err) 141 return 142 } 143 144 for _, ni := range nervanaItems { 145 postLabel := models.PostLabel{ 146 Did: did, 147 Rkey: rkey, 148 Text: ni.Text, 149 Label: ni.Label, 150 EntityId: ni.EntityId, 151 Description: ni.Description, 152 Topic: "", 153 } 154 p.inserters.labelsInserter.Insert(ctx, postLabel) 155 } 156 }(ctx, rec, did, rkey) 157 } 158 159 return nil 160} 161 162func (p *Photocopy) handleCreateFollow(ctx context.Context, recb []byte, uri, did, rkey string, indexedAt time.Time) error { 163 var rec bsky.GraphFollow 164 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 165 return err 166 } 167 168 cat, err := parseTimeFromRecord(rec, rkey) 169 if err != nil { 170 return err 171 } 172 173 follow := models.Follow{ 174 Uri: uri, 175 Did: did, 176 Rkey: rkey, 177 CreatedAt: *cat, 178 IndexedAt: indexedAt, 179 Subject: rec.Subject, 180 } 181 182 if err := p.inserters.followsInserter.Insert(ctx, follow); err != nil { 183 return err 184 } 185 186 return nil 187} 188 189func (p *Photocopy) handleCreateInteraction(ctx context.Context, recb []byte, uri, did, collection, rkey string, indexedAt time.Time) error { 190 colPts := strings.Split(collection, ".") 191 if len(colPts) < 4 { 192 return fmt.Errorf("invalid collection type %s", collection) 193 } 194 195 interaction := models.Interaction{ 196 Uri: uri, 197 Kind: colPts[3], 198 Rkey: rkey, 199 IndexedAt: indexedAt, 200 Did: did, 201 SubjectUri: uri, 202 SubjectDid: did, 203 } 204 205 switch collection { 206 case "app.bsky.feed.like": 207 var rec bsky.FeedLike 208 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 209 return err 210 } 211 212 cat, err := parseTimeFromRecord(rec, rkey) 213 if err != nil { 214 return err 215 } 216 217 if rec.Subject == nil { 218 return fmt.Errorf("invalid subject in like") 219 } 220 221 aturi, err := syntax.ParseATURI(rec.Subject.Uri) 222 if err != nil { 223 return fmt.Errorf("error parsing at-uri: %w", err) 224 225 } 226 227 interaction.SubjectDid = aturi.Authority().String() 228 interaction.SubjectUri = rec.Subject.Uri 229 interaction.CreatedAt = *cat 230 case "app.bsky.feed.repost": 231 var rec bsky.FeedRepost 232 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 233 return err 234 } 235 236 cat, err := parseTimeFromRecord(rec, rkey) 237 if err != nil { 238 return err 239 } 240 241 if rec.Subject == nil { 242 return fmt.Errorf("invalid subject in repost") 243 } 244 245 aturi, err := syntax.ParseATURI(rec.Subject.Uri) 246 if err != nil { 247 return fmt.Errorf("error parsing at-uri: %w", err) 248 249 } 250 251 interaction.SubjectDid = aturi.Authority().String() 252 interaction.SubjectUri = rec.Subject.Uri 253 interaction.CreatedAt = *cat 254 } 255 256 if err := p.inserters.interactionsInserter.Insert(ctx, interaction); err != nil { 257 return err 258 } 259 260 return nil 261} 262 263func parseTimeFromRecord(rec any, rkey string) (*time.Time, error) { 264 var rkeyTime time.Time 265 if rkey != "self" { 266 rt, err := syntax.ParseTID(rkey) 267 if err == nil { 268 rkeyTime = rt.Time() 269 } 270 } 271 switch rec := rec.(type) { 272 case *bsky.FeedPost: 273 t, err := dateparse.ParseAny(rec.CreatedAt) 274 if err != nil { 275 return nil, err 276 } 277 278 if inRange(t) { 279 return &t, nil 280 } 281 282 if rkeyTime.IsZero() || !inRange(rkeyTime) { 283 return timePtr(time.Now()), nil 284 } 285 286 return &rkeyTime, nil 287 case *bsky.FeedRepost: 288 t, err := dateparse.ParseAny(rec.CreatedAt) 289 if err != nil { 290 return nil, err 291 } 292 293 if inRange(t) { 294 return timePtr(t), nil 295 } 296 297 if rkeyTime.IsZero() { 298 return nil, fmt.Errorf("failed to get a useful timestamp from record") 299 } 300 301 return &rkeyTime, nil 302 case *bsky.FeedLike: 303 t, err := dateparse.ParseAny(rec.CreatedAt) 304 if err != nil { 305 return nil, err 306 } 307 308 if inRange(t) { 309 return timePtr(t), nil 310 } 311 312 if rkeyTime.IsZero() { 313 return nil, fmt.Errorf("failed to get a useful timestamp from record") 314 } 315 316 return &rkeyTime, nil 317 case *bsky.ActorProfile: 318 // We can't really trust the createdat in the profile record anyway, and its very possible its missing. just use iat for this one 319 return timePtr(time.Now()), nil 320 case *bsky.FeedGenerator: 321 if !rkeyTime.IsZero() && inRange(rkeyTime) { 322 return &rkeyTime, nil 323 } 324 return timePtr(time.Now()), nil 325 default: 326 if !rkeyTime.IsZero() && inRange(rkeyTime) { 327 return &rkeyTime, nil 328 } 329 return timePtr(time.Now()), nil 330 } 331} 332 333func inRange(t time.Time) bool { 334 now := time.Now() 335 if t.Before(now) { 336 return now.Sub(t) <= time.Hour*24*365*5 337 } 338 return t.Sub(now) <= time.Hour*24*200 339} 340 341func timePtr(t time.Time) *time.Time { 342 return &t 343}