this repo has no description
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "log"
7 "strings"
8
9 appbsky "github.com/bluesky-social/indigo/api/bsky"
10 jetstream "github.com/bluesky-social/jetstream/pkg/models"
11 "github.com/pemistahl/lingua-go"
12 "github.com/redis/go-redis/v9"
13)
14
15func processPosts(ctx context.Context, events <-chan []byte) {
16 languages := []lingua.Language{
17 lingua.Portuguese,
18 lingua.English,
19 lingua.Japanese,
20 lingua.German,
21 lingua.French,
22 lingua.Spanish,
23 lingua.Korean,
24 lingua.Thai,
25 }
26 detector := lingua.
27 NewLanguageDetectorBuilder().
28 FromLanguages(languages...).
29 WithPreloadedLanguageModels().
30 Build()
31
32 rdb := redis.NewClient(&redis.Options{
33 Addr: "localhost:6379",
34 Password: "",
35 DB: 0,
36 })
37 pipe := rdb.Pipeline()
38
39 var (
40 count int
41 event jetstream.Event
42 post appbsky.FeedPost
43 langKey string
44 )
45
46 for message := range events {
47 event = jetstream.Event{}
48 if err := json.Unmarshal(message, &event); err != nil {
49 continue
50 }
51
52 if event.Kind != jetstream.EventKindCommit {
53 continue
54 }
55
56 if event.Commit.Operation != jetstream.CommitOperationCreate {
57 continue
58 }
59
60 commit := *event.Commit
61 post = appbsky.FeedPost{}
62 if err := json.Unmarshal(commit.Record, &post); err != nil {
63 log.Printf("error parsing appbsky.FeedPost: %v\n", err)
64 continue
65 }
66
67 if post.Text == "" {
68 continue
69 }
70
71 language, _ := detector.DetectLanguageOf(post.Text)
72 langKey = `bsky-langs:detected:` + strings.ToLower(language.IsoCode639_1().String())
73
74 if err := pipe.Incr(ctx, langKey).Err(); err != nil {
75 log.Printf("failed incrementing lang key: %v\n", err)
76 }
77
78 count += 1
79 if count%1000 == 0 {
80 if _, err := pipe.Exec(ctx); err != nil {
81 log.Printf("failed to execute pipe\n")
82 }
83 }
84 }
85
86}