this repo has no description
at master 5.4 kB view raw
1package mostliked 2 3import ( 4 "context" 5 "database/sql" 6 _ "embed" 7 "encoding/json" 8 "fmt" 9 "log" 10 "strings" 11 "time" 12 13 appbsky "github.com/bluesky-social/indigo/api/bsky" 14 jetstream "github.com/bluesky-social/jetstream/pkg/models" 15 db "github.com/edavis/bsky-feeds/db/mostliked" 16 "github.com/edavis/bsky-feeds/pkg/feeds" 17 "github.com/karlseguin/ccache/v3" 18 _ "github.com/mattn/go-sqlite3" 19 "github.com/pemistahl/lingua-go" 20) 21 22//go:embed schema.sql 23var ddl string 24 25const MinLikes = 5 26 27type DraftPost struct { 28 Languages []lingua.Language 29 Created int64 30 Likes int64 31} 32 33type CheckpointResults struct { 34 Blocked int 35 Pages int 36 Transferred int 37} 38 39func findDetectableText(post appbsky.FeedPost) string { 40 // if we have text, detect against that 41 // no text but we do have images, detect against first alt text 42 // log.Printf("%+v\n", post) 43 44 if post.Text != "" { 45 return post.Text 46 } else if post.Embed != nil && post.Embed.EmbedImages != nil && post.Embed.EmbedImages.Images != nil { 47 for _, image := range post.Embed.EmbedImages.Images { 48 if image.Alt != "" { 49 return image.Alt 50 } 51 } 52 } 53 return "" 54} 55 56func Handler(ctx context.Context, events <-chan []byte, dbCnx *sql.DB) { 57 if _, err := dbCnx.ExecContext(ctx, ddl); err != nil { 58 log.Printf("could not create tables: %v\n", err) 59 } 60 if _, err := dbCnx.ExecContext(ctx, "PRAGMA wal_autocheckpoint = 0"); err != nil { 61 log.Printf("could not set PRAGMA wal_autocheckpoint: %v\n", err) 62 } 63 queries := db.New(dbCnx) 64 65 drafts := ccache.New(ccache.Configure[DraftPost]().MaxSize(50_000).GetsPerPromote(1)) 66 67 languages := []lingua.Language{ 68 lingua.Portuguese, 69 lingua.English, 70 lingua.Japanese, 71 lingua.German, 72 lingua.French, 73 lingua.Spanish, 74 } 75 detector := lingua. 76 NewLanguageDetectorBuilder(). 77 FromLanguages(languages...). 78 WithPreloadedLanguageModels(). 79 Build() 80 81 var ( 82 dbTx *sql.Tx 83 queriesTx *db.Queries 84 txOpen bool 85 err error 86 eventCount int 87 ) 88 89forLoop: 90 for evt := range events { 91 select { 92 case <-ctx.Done(): 93 break forLoop 94 default: 95 } 96 97 if !txOpen { 98 dbTx, err = dbCnx.BeginTx(ctx, nil) 99 if err != nil { 100 log.Printf("failed to begin transaction: %v\n", err) 101 } 102 txOpen = true 103 queriesTx = queries.WithTx(dbTx) 104 } 105 106 var like appbsky.FeedLike 107 var event jetstream.Event 108 if err := json.Unmarshal(evt, &event); err != nil { 109 continue 110 } 111 if event.Kind != jetstream.EventKindCommit { 112 continue 113 } 114 if event.Commit.Operation != jetstream.CommitOperationCreate { 115 continue 116 } 117 commit := *event.Commit 118 if commit.Collection == "app.bsky.feed.post" { 119 var post appbsky.FeedPost 120 postUri := fmt.Sprintf("at://%s/%s/%s", event.Did, commit.Collection, commit.RKey) 121 if err := json.Unmarshal(commit.Record, &post); err != nil { 122 log.Printf("error parsing appbsky.FeedPost: %v\n", err) 123 continue 124 } 125 draftPost := DraftPost{ 126 Created: feeds.SafeTimestamp(post.CreatedAt), 127 Likes: 0, 128 } 129 if text := findDetectableText(post); text != "" { 130 language, _ := detector.DetectLanguageOf(text) 131 draftPost.Languages = []lingua.Language{language} 132 } else if len(post.Langs) > 0 { 133 var iso lingua.IsoCode639_1 134 for _, lang := range post.Langs { 135 iso = lingua.GetIsoCode639_1FromValue(lang) 136 draftPost.Languages = append(draftPost.Languages, lingua.GetLanguageFromIsoCode639_1(iso)) 137 } 138 } 139 drafts.Set(postUri, draftPost, 30*time.Minute) 140 continue 141 } else if commit.Collection == "app.bsky.feed.like" { 142 if err := json.Unmarshal(commit.Record, &like); err != nil { 143 log.Printf("error parsing appbsky.FeedLike: %v\n", err) 144 continue 145 } 146 } 147 148 draft := drafts.Get(like.Subject.Uri) 149 if draft != nil { 150 draftPost := draft.Value() 151 draftPost.Likes = draftPost.Likes + 1 152 if draftPost.Likes < MinLikes { 153 drafts.Replace(like.Subject.Uri, draftPost) 154 continue 155 } 156 drafts.Delete(like.Subject.Uri) 157 err := queriesTx.InsertPost(ctx, db.InsertPostParams{ 158 Uri: like.Subject.Uri, 159 CreateTs: draftPost.Created, 160 Likes: draftPost.Likes, 161 }) 162 if err != nil { 163 log.Printf("error inserting post: %v\n", err) 164 } 165 for _, lang := range draftPost.Languages { 166 err = queriesTx.InsertLang(ctx, db.InsertLangParams{ 167 Uri: like.Subject.Uri, 168 Lang: strings.ToLower(lang.IsoCode639_1().String()), 169 }) 170 if err != nil { 171 log.Printf("error inserting lang: %v\n", err) 172 } 173 } 174 } else { 175 err := queriesTx.UpdateLikes(ctx, like.Subject.Uri) 176 if err != nil { 177 log.Printf("error updating likes: %v\n", err) 178 } 179 } 180 181 eventCount += 1 182 if eventCount%1000 == 0 { 183 if err := queriesTx.TrimPosts(ctx); err != nil { 184 log.Printf("error clearing expired posts: %v\n", err) 185 } 186 187 if err := dbTx.Commit(); err != nil { 188 log.Printf("commit failed: %v\n", err) 189 } 190 191 var results CheckpointResults 192 err := dbCnx.QueryRowContext(ctx, "PRAGMA wal_checkpoint(RESTART)").Scan(&results.Blocked, &results.Pages, &results.Transferred) 193 switch { 194 case err != nil: 195 log.Printf("failed checkpoint: %v\n", err) 196 case results.Blocked == 1: 197 log.Printf("checkpoint: blocked\n") 198 case results.Pages == results.Transferred: 199 log.Printf("checkpoint: %d pages transferred\n", results.Transferred) 200 case results.Pages != results.Transferred: 201 log.Printf("checkpoint: %d pages, %d transferred\n", results.Pages, results.Transferred) 202 } 203 204 txOpen = false 205 } 206 } 207}