this repo has no description
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "log"
7 "strings"
8
9 appbsky "github.com/bluesky-social/indigo/api/bsky"
10 jetstream "github.com/bluesky-social/jetstream/pkg/models"
11 "github.com/pemistahl/lingua-go"
12 "github.com/redis/go-redis/v9"
13)
14
15func processPosts(ctx context.Context, events <-chan []byte) {
16 languages := []lingua.Language{
17 lingua.Portuguese,
18 lingua.English,
19 lingua.Japanese,
20 lingua.German,
21 lingua.French,
22 lingua.Spanish,
23 }
24 detector := lingua.
25 NewLanguageDetectorBuilder().
26 FromLanguages(languages...).
27 WithPreloadedLanguageModels().
28 Build()
29
30 rdb := redis.NewClient(&redis.Options{
31 Addr: "localhost:6379",
32 Password: "",
33 DB: 0,
34 })
35 pipe := rdb.Pipeline()
36
37 var (
38 count int
39 event jetstream.Event
40 post appbsky.FeedPost
41 langKey string
42 )
43
44 for message := range events {
45 event = jetstream.Event{}
46 if err := json.Unmarshal(message, &event); err != nil {
47 continue
48 }
49
50 if event.Kind != jetstream.EventKindCommit {
51 continue
52 }
53
54 if event.Commit.Operation != jetstream.CommitOperationCreate {
55 continue
56 }
57
58 commit := *event.Commit
59 post = appbsky.FeedPost{}
60 if err := json.Unmarshal(commit.Record, &post); err != nil {
61 log.Printf("error parsing appbsky.FeedPost: %v\n", err)
62 continue
63 }
64
65 if post.Text == "" {
66 continue
67 }
68
69 language, _ := detector.DetectLanguageOf(post.Text)
70 langKey = `bsky-langs:detected:` + strings.ToLower(language.IsoCode639_1().String())
71
72 if err := pipe.Incr(ctx, langKey).Err(); err != nil {
73 log.Printf("failed incrementing lang key: %v\n", err)
74 }
75
76 count += 1
77 if count%1000 == 0 {
78 if _, err := pipe.Exec(ctx); err != nil {
79 log.Printf("failed to execute pipe\n")
80 }
81 }
82 }
83
84}