this repo has no description
at master 1.8 kB view raw
1package main 2 3import ( 4 "context" 5 "encoding/json" 6 "log" 7 "strings" 8 9 appbsky "github.com/bluesky-social/indigo/api/bsky" 10 jetstream "github.com/bluesky-social/jetstream/pkg/models" 11 "github.com/pemistahl/lingua-go" 12 "github.com/redis/go-redis/v9" 13) 14 15func processPosts(ctx context.Context, events <-chan []byte) { 16 languages := []lingua.Language{ 17 lingua.Portuguese, 18 lingua.English, 19 lingua.Japanese, 20 lingua.German, 21 lingua.French, 22 lingua.Spanish, 23 lingua.Korean, 24 lingua.Thai, 25 } 26 detector := lingua. 27 NewLanguageDetectorBuilder(). 28 FromLanguages(languages...). 29 WithPreloadedLanguageModels(). 30 Build() 31 32 rdb := redis.NewClient(&redis.Options{ 33 Addr: "localhost:6379", 34 Password: "", 35 DB: 0, 36 }) 37 pipe := rdb.Pipeline() 38 39 var ( 40 count int 41 event jetstream.Event 42 post appbsky.FeedPost 43 langKey string 44 ) 45 46 for message := range events { 47 event = jetstream.Event{} 48 if err := json.Unmarshal(message, &event); err != nil { 49 continue 50 } 51 52 if event.Kind != jetstream.EventKindCommit { 53 continue 54 } 55 56 if event.Commit.Operation != jetstream.CommitOperationCreate { 57 continue 58 } 59 60 commit := *event.Commit 61 post = appbsky.FeedPost{} 62 if err := json.Unmarshal(commit.Record, &post); err != nil { 63 log.Printf("error parsing appbsky.FeedPost: %v\n", err) 64 continue 65 } 66 67 if post.Text == "" { 68 continue 69 } 70 71 language, _ := detector.DetectLanguageOf(post.Text) 72 langKey = `bsky-langs:detected:` + strings.ToLower(language.IsoCode639_1().String()) 73 74 if err := pipe.Incr(ctx, langKey).Err(); err != nil { 75 log.Printf("failed incrementing lang key: %v\n", err) 76 } 77 78 count += 1 79 if count%1000 == 0 { 80 if _, err := pipe.Exec(ctx); err != nil { 81 log.Printf("failed to execute pipe\n") 82 } 83 } 84 } 85 86}