this repo has no description
1package main 2 3import ( 4 "context" 5 "encoding/json" 6 "log" 7 "strings" 8 9 appbsky "github.com/bluesky-social/indigo/api/bsky" 10 jetstream "github.com/bluesky-social/jetstream/pkg/models" 11 "github.com/pemistahl/lingua-go" 12 "github.com/redis/go-redis/v9" 13) 14 15func processPosts(ctx context.Context, events <-chan []byte) { 16 languages := []lingua.Language{ 17 lingua.Portuguese, 18 lingua.English, 19 lingua.Japanese, 20 lingua.German, 21 lingua.French, 22 lingua.Spanish, 23 } 24 detector := lingua. 25 NewLanguageDetectorBuilder(). 26 FromLanguages(languages...). 27 WithPreloadedLanguageModels(). 28 Build() 29 30 rdb := redis.NewClient(&redis.Options{ 31 Addr: "localhost:6379", 32 Password: "", 33 DB: 0, 34 }) 35 pipe := rdb.Pipeline() 36 37 var ( 38 count int 39 event jetstream.Event 40 post appbsky.FeedPost 41 langKey string 42 ) 43 44 for message := range events { 45 event = jetstream.Event{} 46 if err := json.Unmarshal(message, &event); err != nil { 47 continue 48 } 49 50 if event.Kind != jetstream.EventKindCommit { 51 continue 52 } 53 54 if event.Commit.Operation != jetstream.CommitOperationCreate { 55 continue 56 } 57 58 commit := *event.Commit 59 post = appbsky.FeedPost{} 60 if err := json.Unmarshal(commit.Record, &post); err != nil { 61 log.Printf("error parsing appbsky.FeedPost: %v\n", err) 62 continue 63 } 64 65 if post.Text == "" { 66 continue 67 } 68 69 language, _ := detector.DetectLanguageOf(post.Text) 70 langKey = `bsky-langs:detected:` + strings.ToLower(language.IsoCode639_1().String()) 71 72 if err := pipe.Incr(ctx, langKey).Err(); err != nil { 73 log.Printf("failed incrementing lang key: %v\n", err) 74 } 75 76 count += 1 77 if count%1000 == 0 { 78 if _, err := pipe.Exec(ctx); err != nil { 79 log.Printf("failed to execute pipe\n") 80 } 81 } 82 } 83 84}