this repo has no description
1package photocopy 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "strings" 8 "time" 9 10 "github.com/araddon/dateparse" 11 "github.com/bluesky-social/indigo/api/bsky" 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "github.com/haileyok/photocopy/models" 14) 15 16func (p *Photocopy) handleCreate(ctx context.Context, recb []byte, indexedAt, rev, did, collection, rkey, cid, seq string) error { 17 iat, err := dateparse.ParseAny(indexedAt) 18 if err != nil { 19 return err 20 } 21 22 if err := p.handleCreateRecord(ctx, did, rkey, collection, cid, recb, seq); err != nil { 23 p.logger.Error("error creating record", "error", err) 24 } 25 26 switch collection { 27 case "app.bsky.feed.post": 28 return p.handleCreatePost(ctx, rev, recb, uriFromParts(did, collection, rkey), did, collection, rkey, cid, iat) 29 case "app.bsky.graph.follow": 30 return p.handleCreateFollow(ctx, recb, uriFromParts(did, collection, rkey), did, rkey, iat) 31 case "app.bsky.feed.like", "app.bsky.feed.repost": 32 return p.handleCreateInteraction(ctx, recb, uriFromParts(did, collection, rkey), did, collection, rkey, iat) 33 default: 34 return nil 35 } 36} 37 38func (p *Photocopy) handleCreateRecord(ctx context.Context, did, rkey, collection, cid string, raw []byte, seq string) error { 39 var cat time.Time 40 prkey, err := syntax.ParseTID(rkey) 41 if err == nil { 42 cat = prkey.Time() 43 } else { 44 cat = time.Now() 45 } 46 47 rec := models.Record{ 48 Did: did, 49 Rkey: rkey, 50 Collection: collection, 51 Cid: cid, 52 Seq: seq, 53 Raw: raw, 54 CreatedAt: cat, 55 } 56 57 if err := p.inserters.recordsInserter.Insert(ctx, rec); err != nil { 58 return err 59 } 60 61 return nil 62} 63 64func (p *Photocopy) handleCreatePost(ctx context.Context, rev string, recb []byte, uri, did, collection, rkey, cid string, indexedAt time.Time) error { 65 var rec bsky.FeedPost 66 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 67 return err 68 } 69 70 cat, err := parseTimeFromRecord(rec, rkey) 71 if err != nil { 72 return err 73 } 74 75 post := models.Post{ 76 Uri: uri, 77 Rkey: rkey, 78 CreatedAt: *cat, 79 IndexedAt: indexedAt, 80 Did: did, 81 } 82 83 if rec.Reply != nil { 84 if rec.Reply.Parent != nil { 85 aturi, err := syntax.ParseATURI(rec.Reply.Parent.Uri) 86 if err != nil { 87 return fmt.Errorf("error parsing at-uri: %w", err) 88 89 } 90 post.ParentDid = aturi.Authority().String() 91 post.ParentUri = rec.Reply.Parent.Uri 92 } 93 if rec.Reply.Root != nil { 94 aturi, err := syntax.ParseATURI(rec.Reply.Root.Uri) 95 if err != nil { 96 return fmt.Errorf("error parsing at-uri: %w", err) 97 98 } 99 post.RootDid = aturi.Authority().String() 100 post.RootUri = rec.Reply.Root.Uri 101 } 102 } 103 104 if rec.Embed != nil && rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil { 105 aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecord.Record.Uri) 106 if err != nil { 107 return fmt.Errorf("error parsing at-uri: %w", err) 108 109 } 110 post.QuoteDid = aturi.Authority().String() 111 post.QuoteUri = rec.Embed.EmbedRecord.Record.Uri 112 } else if rec.Embed != nil && rec.Embed.EmbedRecordWithMedia != nil && rec.Embed.EmbedRecordWithMedia.Record != nil && rec.Embed.EmbedRecordWithMedia.Record.Record != nil { 113 aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecordWithMedia.Record.Record.Uri) 114 if err != nil { 115 return fmt.Errorf("error parsing at-uri: %w", err) 116 117 } 118 post.QuoteDid = aturi.Authority().String() 119 post.QuoteUri = rec.Embed.EmbedRecordWithMedia.Record.Record.Uri 120 } 121 122 if err := p.inserters.postsInserter.Insert(ctx, post); err != nil { 123 return err 124 } 125 126 return nil 127} 128 129func (p *Photocopy) handleCreateFollow(ctx context.Context, recb []byte, uri, did, rkey string, indexedAt time.Time) error { 130 var rec bsky.GraphFollow 131 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 132 return err 133 } 134 135 cat, err := parseTimeFromRecord(rec, rkey) 136 if err != nil { 137 return err 138 } 139 140 follow := models.Follow{ 141 Uri: uri, 142 Did: did, 143 Rkey: rkey, 144 CreatedAt: *cat, 145 IndexedAt: indexedAt, 146 Subject: rec.Subject, 147 } 148 149 if err := p.inserters.followsInserter.Insert(ctx, follow); err != nil { 150 return err 151 } 152 153 return nil 154} 155 156func (p *Photocopy) handleCreateInteraction(ctx context.Context, recb []byte, uri, did, collection, rkey string, indexedAt time.Time) error { 157 colPts := strings.Split(collection, ".") 158 if len(colPts) < 4 { 159 return fmt.Errorf("invalid collection type %s", collection) 160 } 161 162 interaction := models.Interaction{ 163 Uri: uri, 164 Kind: colPts[3], 165 Rkey: rkey, 166 IndexedAt: indexedAt, 167 Did: did, 168 SubjectUri: uri, 169 SubjectDid: did, 170 } 171 172 switch collection { 173 case "app.bsky.feed.like": 174 var rec bsky.FeedLike 175 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 176 return err 177 } 178 179 cat, err := parseTimeFromRecord(rec, rkey) 180 if err != nil { 181 return err 182 } 183 184 if rec.Subject == nil { 185 return fmt.Errorf("invalid subject in like") 186 } 187 188 aturi, err := syntax.ParseATURI(rec.Subject.Uri) 189 if err != nil { 190 return fmt.Errorf("error parsing at-uri: %w", err) 191 192 } 193 194 interaction.SubjectDid = aturi.Authority().String() 195 interaction.SubjectUri = rec.Subject.Uri 196 interaction.CreatedAt = *cat 197 case "app.bsky.feed.repost": 198 var rec bsky.FeedRepost 199 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 200 return err 201 } 202 203 cat, err := parseTimeFromRecord(rec, rkey) 204 if err != nil { 205 return err 206 } 207 208 if rec.Subject == nil { 209 return fmt.Errorf("invalid subject in repost") 210 } 211 212 aturi, err := syntax.ParseATURI(rec.Subject.Uri) 213 if err != nil { 214 return fmt.Errorf("error parsing at-uri: %w", err) 215 216 } 217 218 interaction.SubjectDid = aturi.Authority().String() 219 interaction.SubjectUri = rec.Subject.Uri 220 interaction.CreatedAt = *cat 221 } 222 223 if err := p.inserters.interactionsInserter.Insert(ctx, interaction); err != nil { 224 return err 225 } 226 227 return nil 228} 229 230func parseTimeFromRecord(rec any, rkey string) (*time.Time, error) { 231 var rkeyTime time.Time 232 if rkey != "self" { 233 rt, err := syntax.ParseTID(rkey) 234 if err == nil { 235 rkeyTime = rt.Time() 236 } 237 } 238 switch rec := rec.(type) { 239 case *bsky.FeedPost: 240 t, err := dateparse.ParseAny(rec.CreatedAt) 241 if err != nil { 242 return nil, err 243 } 244 245 if inRange(t) { 246 return &t, nil 247 } 248 249 if rkeyTime.IsZero() || !inRange(rkeyTime) { 250 return timePtr(time.Now()), nil 251 } 252 253 return &rkeyTime, nil 254 case *bsky.FeedRepost: 255 t, err := dateparse.ParseAny(rec.CreatedAt) 256 if err != nil { 257 return nil, err 258 } 259 260 if inRange(t) { 261 return timePtr(t), nil 262 } 263 264 if rkeyTime.IsZero() { 265 return nil, fmt.Errorf("failed to get a useful timestamp from record") 266 } 267 268 return &rkeyTime, nil 269 case *bsky.FeedLike: 270 t, err := dateparse.ParseAny(rec.CreatedAt) 271 if err != nil { 272 return nil, err 273 } 274 275 if inRange(t) { 276 return timePtr(t), nil 277 } 278 279 if rkeyTime.IsZero() { 280 return nil, fmt.Errorf("failed to get a useful timestamp from record") 281 } 282 283 return &rkeyTime, nil 284 case *bsky.ActorProfile: 285 // We can't really trust the createdat in the profile record anyway, and its very possible its missing. just use iat for this one 286 return timePtr(time.Now()), nil 287 case *bsky.FeedGenerator: 288 if !rkeyTime.IsZero() && inRange(rkeyTime) { 289 return &rkeyTime, nil 290 } 291 return timePtr(time.Now()), nil 292 default: 293 if !rkeyTime.IsZero() && inRange(rkeyTime) { 294 return &rkeyTime, nil 295 } 296 return timePtr(time.Now()), nil 297 } 298} 299 300func inRange(t time.Time) bool { 301 now := time.Now() 302 if t.Before(now) { 303 return now.Sub(t) <= time.Hour*24*365*5 304 } 305 return t.Sub(now) <= time.Hour*24*200 306} 307 308func timePtr(t time.Time) *time.Time { 309 return &t 310}