this repo has no description
1package mostliked
2
3import (
4 "context"
5 "database/sql"
6 _ "embed"
7 "encoding/json"
8 "fmt"
9 "log"
10 "strings"
11 "time"
12
13 appbsky "github.com/bluesky-social/indigo/api/bsky"
14 jetstream "github.com/bluesky-social/jetstream/pkg/models"
15 db "github.com/edavis/bsky-feeds/db/mostliked"
16 "github.com/edavis/bsky-feeds/pkg/feeds"
17 "github.com/karlseguin/ccache/v3"
18 _ "github.com/mattn/go-sqlite3"
19 "github.com/pemistahl/lingua-go"
20)
21
22//go:embed schema.sql
23var ddl string
24
25const MinLikes = 5
26
27type DraftPost struct {
28 Languages []lingua.Language
29 Created int64
30 Likes int64
31}
32
33type CheckpointResults struct {
34 Blocked int
35 Pages int
36 Transferred int
37}
38
39func findDetectableText(post appbsky.FeedPost) string {
40 // if we have text, detect against that
41 // no text but we do have images, detect against first alt text
42 // log.Printf("%+v\n", post)
43
44 if post.Text != "" {
45 return post.Text
46 } else if post.Embed != nil && post.Embed.EmbedImages != nil && post.Embed.EmbedImages.Images != nil {
47 for _, image := range post.Embed.EmbedImages.Images {
48 if image.Alt != "" {
49 return image.Alt
50 }
51 }
52 }
53 return ""
54}
55
56func Handler(ctx context.Context, events <-chan []byte, dbCnx *sql.DB) {
57 if _, err := dbCnx.ExecContext(ctx, ddl); err != nil {
58 log.Printf("could not create tables: %v\n", err)
59 }
60 if _, err := dbCnx.ExecContext(ctx, "PRAGMA wal_autocheckpoint = 0"); err != nil {
61 log.Printf("could not set PRAGMA wal_autocheckpoint: %v\n", err)
62 }
63 queries := db.New(dbCnx)
64
65 drafts := ccache.New(ccache.Configure[DraftPost]().MaxSize(50_000).GetsPerPromote(1))
66
67 languages := []lingua.Language{
68 lingua.Portuguese,
69 lingua.English,
70 lingua.Japanese,
71 lingua.German,
72 lingua.French,
73 lingua.Spanish,
74 }
75 detector := lingua.
76 NewLanguageDetectorBuilder().
77 FromLanguages(languages...).
78 WithPreloadedLanguageModels().
79 Build()
80
81 var (
82 dbTx *sql.Tx
83 queriesTx *db.Queries
84 txOpen bool
85 err error
86 eventCount int
87 )
88
89forLoop:
90 for evt := range events {
91 select {
92 case <-ctx.Done():
93 break forLoop
94 default:
95 }
96
97 if !txOpen {
98 dbTx, err = dbCnx.BeginTx(ctx, nil)
99 if err != nil {
100 log.Printf("failed to begin transaction: %v\n", err)
101 }
102 txOpen = true
103 queriesTx = queries.WithTx(dbTx)
104 }
105
106 var like appbsky.FeedLike
107 var event jetstream.Event
108 if err := json.Unmarshal(evt, &event); err != nil {
109 continue
110 }
111 if event.Kind != jetstream.EventKindCommit {
112 continue
113 }
114 if event.Commit.Operation != jetstream.CommitOperationCreate {
115 continue
116 }
117 commit := *event.Commit
118 if commit.Collection == "app.bsky.feed.post" {
119 var post appbsky.FeedPost
120 postUri := fmt.Sprintf("at://%s/%s/%s", event.Did, commit.Collection, commit.RKey)
121 if err := json.Unmarshal(commit.Record, &post); err != nil {
122 log.Printf("error parsing appbsky.FeedPost: %v\n", err)
123 continue
124 }
125 draftPost := DraftPost{
126 Created: feeds.SafeTimestamp(post.CreatedAt),
127 Likes: 0,
128 }
129 if text := findDetectableText(post); text != "" {
130 language, _ := detector.DetectLanguageOf(text)
131 draftPost.Languages = []lingua.Language{language}
132 } else if len(post.Langs) > 0 {
133 var iso lingua.IsoCode639_1
134 for _, lang := range post.Langs {
135 iso = lingua.GetIsoCode639_1FromValue(lang)
136 draftPost.Languages = append(draftPost.Languages, lingua.GetLanguageFromIsoCode639_1(iso))
137 }
138 }
139 drafts.Set(postUri, draftPost, 30*time.Minute)
140 continue
141 } else if commit.Collection == "app.bsky.feed.like" {
142 if err := json.Unmarshal(commit.Record, &like); err != nil {
143 log.Printf("error parsing appbsky.FeedLike: %v\n", err)
144 continue
145 }
146 }
147
148 draft := drafts.Get(like.Subject.Uri)
149 if draft != nil {
150 draftPost := draft.Value()
151 draftPost.Likes = draftPost.Likes + 1
152 if draftPost.Likes < MinLikes {
153 drafts.Replace(like.Subject.Uri, draftPost)
154 continue
155 }
156 drafts.Delete(like.Subject.Uri)
157 err := queriesTx.InsertPost(ctx, db.InsertPostParams{
158 Uri: like.Subject.Uri,
159 CreateTs: draftPost.Created,
160 Likes: draftPost.Likes,
161 })
162 if err != nil {
163 log.Printf("error inserting post: %v\n", err)
164 }
165 for _, lang := range draftPost.Languages {
166 err = queriesTx.InsertLang(ctx, db.InsertLangParams{
167 Uri: like.Subject.Uri,
168 Lang: strings.ToLower(lang.IsoCode639_1().String()),
169 })
170 if err != nil {
171 log.Printf("error inserting lang: %v\n", err)
172 }
173 }
174 } else {
175 err := queriesTx.UpdateLikes(ctx, like.Subject.Uri)
176 if err != nil {
177 log.Printf("error updating likes: %v\n", err)
178 }
179 }
180
181 eventCount += 1
182 if eventCount%1000 == 0 {
183 if err := queriesTx.TrimPosts(ctx); err != nil {
184 log.Printf("error clearing expired posts: %v\n", err)
185 }
186
187 if err := dbTx.Commit(); err != nil {
188 log.Printf("commit failed: %v\n", err)
189 }
190
191 var results CheckpointResults
192 err := dbCnx.QueryRowContext(ctx, "PRAGMA wal_checkpoint(RESTART)").Scan(&results.Blocked, &results.Pages, &results.Transferred)
193 switch {
194 case err != nil:
195 log.Printf("failed checkpoint: %v\n", err)
196 case results.Blocked == 1:
197 log.Printf("checkpoint: blocked\n")
198 case results.Pages == results.Transferred:
199 log.Printf("checkpoint: %d pages transferred\n", results.Transferred)
200 case results.Pages != results.Transferred:
201 log.Printf("checkpoint: %d pages, %d transferred\n", results.Pages, results.Transferred)
202 }
203
204 txOpen = false
205 }
206 }
207}