this repo has no description
1package main 2 3import ( 4 "context" 5 "encoding/json" 6 "log" 7 "os" 8 "os/signal" 9 "strings" 10 "syscall" 11 12 appbsky "github.com/bluesky-social/indigo/api/bsky" 13 jetstream "github.com/bluesky-social/jetstream/pkg/models" 14 "github.com/gorilla/websocket" 15 "github.com/redis/go-redis/v9" 16) 17 18const JetstreamUrl = `wss://jetstream1.us-west.bsky.network/subscribe` 19 20var AppBskyAllowlist = map[string]bool{ 21 "app.bsky.actor.profile": true, 22 "app.bsky.feed.generator": true, 23 "app.bsky.feed.like": true, 24 "app.bsky.feed.post": true, 25 "app.bsky.feed.postgate": true, 26 "app.bsky.feed.repost": true, 27 "app.bsky.feed.threadgate": true, 28 "app.bsky.graph.block": true, 29 "app.bsky.graph.follow": true, 30 "app.bsky.graph.list": true, 31 "app.bsky.graph.listblock": true, 32 "app.bsky.graph.listitem": true, 33 "app.bsky.graph.starterpack": true, 34 "app.bsky.labeler.service": true, 35 "chat.bsky.actor.declaration": true, 36} 37 38var AtprotoAllowlist = map[string]bool{ 39 "social.psky": true, 40 "blue.zio.atfile": true, 41 "com.shinolabs.pinksea": true, 42 "com.whtwnd": true, 43 "events.smokesignal": true, 44 "fyi.unravel": true, 45 "xyz.statusphere": true, 46} 47 48func trackedRecordType(collection string) bool { 49 for k, _ := range AppBskyAllowlist { 50 if collection == k { 51 return true 52 } 53 } 54 for k, _ := range AtprotoAllowlist { 55 if strings.HasPrefix(collection, k) { 56 return true 57 } 58 } 59 return false 60} 61 62func handler(ctx context.Context, events <-chan jetstream.Event) { 63 rdb := redis.NewClient(&redis.Options{ 64 Addr: "localhost:6379", 65 Password: "", 66 DB: 0, 67 }) 68 pipe := rdb.Pipeline() 69 var eventCount int 70 71eventLoop: 72 for event := range events { 73 select { 74 case <-ctx.Done(): 75 break eventLoop 76 default: 77 } 78 79 if event.Kind != jetstream.EventKindCommit { 80 continue 81 } 82 if event.Commit.Operation != jetstream.CommitOperationCreate { 83 continue 84 } 85 86 commit := *event.Commit 87 collection := commit.Collection 88 89 // if collection doesn't start with either allowlist, continue 90 if !trackedRecordType(collection) { 91 continue 92 } 93 94 // if collection starts with one of the Atproto allowlist keys, incr 95 for k, _ := range AtprotoAllowlist { 96 if strings.HasPrefix(collection, k) { 97 ckey := strings.ReplaceAll(collection, ".", "_") 98 if err := pipe.Incr(ctx, "dev.edavis.atproto.collection."+ckey); err != nil { 99 log.Printf("failed incrementing an atproto collection: %v\n", err) 100 } 101 } 102 } 103 104 // if a post with an embed, incr that $embed type 105 if collection == "app.bsky.feed.post" { 106 var post appbsky.FeedPost 107 if err := json.Unmarshal(commit.Record, &post); err != nil { 108 log.Printf("error parsing appbsky.FeedPost: %v\n", err) 109 } 110 if post.Embed != nil { 111 var ekey string 112 switch { 113 case post.Embed.EmbedImages != nil: 114 ekey = post.Embed.EmbedImages.LexiconTypeID 115 case post.Embed.EmbedVideo != nil: 116 ekey = post.Embed.EmbedVideo.LexiconTypeID 117 case post.Embed.EmbedExternal != nil: 118 ekey = post.Embed.EmbedExternal.LexiconTypeID 119 case post.Embed.EmbedRecord != nil: 120 ekey = post.Embed.EmbedRecord.LexiconTypeID 121 case post.Embed.EmbedRecordWithMedia != nil: 122 ekey = post.Embed.EmbedRecordWithMedia.LexiconTypeID 123 } 124 if ekey == "" { 125 continue 126 } 127 if err := pipe.Incr(ctx, "app.bsky.feed.post:embed:"+ekey).Err(); err != nil { 128 log.Printf("failed incrementing embed key: %v\n", err) 129 } 130 } 131 } 132 133 // incr the collection and ops 134 if err := pipe.Incr(ctx, collection).Err(); err != nil { 135 log.Printf("failed incrementing collection: %v\n", err) 136 } 137 138 if err := pipe.Incr(ctx, `dev.edavis.muninsky.ops`).Err(); err != nil { 139 log.Printf("failed incrementing ops: %v\n", err) 140 } 141 142 // add one to the count, every 500 ops execute the piepline 143 eventCount += 1 144 if eventCount%500 == 0 { 145 if _, err := pipe.Exec(ctx); err != nil { 146 log.Printf("failed to exec pipe\n") 147 } 148 } 149 } 150} 151 152func main() { 153 ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM) 154 defer stop() 155 156 conn, _, err := websocket.DefaultDialer.DialContext(ctx, JetstreamUrl, nil) 157 if err != nil { 158 log.Fatalf("failed to open websocket: %v\n", err) 159 } 160 defer func() { 161 if err := conn.Close(); err != nil { 162 log.Printf("failed to close websocket: %v\n", err) 163 } 164 log.Printf("websocket closed\n") 165 }() 166 167 jetstreamEvents := make(chan jetstream.Event) 168 go handler(ctx, jetstreamEvents) 169 170 log.Printf("starting up\n") 171 var event jetstream.Event 172 go func() { 173 for { 174 event = jetstream.Event{} 175 err := conn.ReadJSON(&event) 176 if err != nil { 177 log.Printf("ReadJSON error: %v\n", err) 178 stop() 179 break 180 } else { 181 jetstreamEvents <- event 182 } 183 } 184 }() 185 186 <-ctx.Done() 187 log.Printf("shutting down\n") 188}