this repo has no description
1package main 2 3import ( 4 "context" 5 "encoding/json" 6 "log" 7 "os" 8 "os/signal" 9 "strings" 10 "sync" 11 "syscall" 12 "time" 13 14 appbsky "github.com/bluesky-social/indigo/api/bsky" 15 jetstream "github.com/bluesky-social/jetstream/pkg/models" 16 "github.com/gorilla/websocket" 17 "github.com/redis/go-redis/v9" 18) 19 20type Queue struct { 21 lk sync.Mutex 22 events []jetstream.Event 23} 24 25func NewQueue(capacity int) *Queue { 26 return &Queue{ 27 events: make([]jetstream.Event, 0, capacity), 28 } 29} 30 31func (q *Queue) Enqueue(event jetstream.Event) { 32 q.lk.Lock() 33 defer q.lk.Unlock() 34 35 q.events = append(q.events, event) 36} 37 38func (q *Queue) Dequeue() (jetstream.Event, bool) { 39 q.lk.Lock() 40 defer q.lk.Unlock() 41 42 var event jetstream.Event 43 44 if len(q.events) == 0 { 45 return event, false 46 } 47 48 event = q.events[0] 49 q.events = q.events[1:] 50 return event, true 51} 52 53func (q *Queue) Size() int { 54 q.lk.Lock() 55 defer q.lk.Unlock() 56 57 return len(q.events) 58} 59 60const JetstreamUrl = `wss://jetstream1.us-west.bsky.network/subscribe` 61 62var AppBskyAllowlist = map[string]bool{ 63 "app.bsky.actor.profile": true, 64 "app.bsky.feed.generator": true, 65 "app.bsky.feed.like": true, 66 "app.bsky.feed.post": true, 67 "app.bsky.feed.postgate": true, 68 "app.bsky.feed.repost": true, 69 "app.bsky.feed.threadgate": true, 70 "app.bsky.graph.block": true, 71 "app.bsky.graph.follow": true, 72 "app.bsky.graph.list": true, 73 "app.bsky.graph.listblock": true, 74 "app.bsky.graph.listitem": true, 75 "app.bsky.graph.starterpack": true, 76 "app.bsky.labeler.service": true, 77 "chat.bsky.actor.declaration": true, 78} 79 80var AtprotoAllowlist = map[string]bool{ 81 "social.psky": true, 82 "blue.zio.atfile": true, 83 "com.shinolabs.pinksea": true, 84 "com.whtwnd": true, 85 "events.smokesignal": true, 86 "fyi.unravel": true, 87 "xyz.statusphere": true, 88} 89 90func trackedRecordType(collection string) bool { 91 for k, _ := range AppBskyAllowlist { 92 if collection == k { 93 return true 94 } 95 } 96 for k, _ := range AtprotoAllowlist { 97 if strings.HasPrefix(collection, k) { 98 return true 99 } 100 } 101 return false 102} 103 104func handler(ctx context.Context, queue *Queue) { 105 rdb := redis.NewClient(&redis.Options{ 106 Addr: "localhost:6379", 107 Password: "", 108 DB: 0, 109 }) 110 pipe := rdb.Pipeline() 111 var eventCount int 112 113eventLoop: 114 for { 115 select { 116 case <-ctx.Done(): 117 break eventLoop 118 default: 119 } 120 121 event, ok := queue.Dequeue() 122 if !ok { 123 time.Sleep(100 * time.Millisecond) 124 continue 125 } 126 127 if event.Kind != jetstream.EventKindCommit { 128 continue 129 } 130 if event.Commit.Operation != jetstream.CommitOperationCreate { 131 continue 132 } 133 134 commit := *event.Commit 135 collection := commit.Collection 136 137 // if collection doesn't start with either allowlist, continue 138 if !trackedRecordType(collection) { 139 continue 140 } 141 142 // if collection starts with one of the Atproto allowlist keys, incr 143 for k, _ := range AtprotoAllowlist { 144 if strings.HasPrefix(collection, k) { 145 ckey := strings.ReplaceAll(k, ".", "_") 146 if err := pipe.Incr(ctx, "dev.edavis.atproto.collection."+ckey).Err(); err != nil { 147 log.Printf("failed incrementing an atproto collection: %v\n", err) 148 } 149 } 150 } 151 152 // if a post with an embed, incr that $embed type 153 if collection == "app.bsky.feed.post" { 154 var post appbsky.FeedPost 155 if err := json.Unmarshal(commit.Record, &post); err != nil { 156 log.Printf("error parsing appbsky.FeedPost: %v\n", err) 157 } 158 if post.Embed != nil { 159 var ekey string 160 switch { 161 case post.Embed.EmbedImages != nil: 162 ekey = post.Embed.EmbedImages.LexiconTypeID 163 case post.Embed.EmbedVideo != nil: 164 ekey = post.Embed.EmbedVideo.LexiconTypeID 165 case post.Embed.EmbedExternal != nil: 166 ekey = post.Embed.EmbedExternal.LexiconTypeID 167 case post.Embed.EmbedRecord != nil: 168 ekey = post.Embed.EmbedRecord.LexiconTypeID 169 case post.Embed.EmbedRecordWithMedia != nil: 170 ekey = post.Embed.EmbedRecordWithMedia.LexiconTypeID 171 } 172 if ekey == "" { 173 continue 174 } 175 if err := pipe.Incr(ctx, "app.bsky.feed.post:embed:"+ekey).Err(); err != nil { 176 log.Printf("failed incrementing embed key: %v\n", err) 177 } 178 } 179 } 180 181 // incr the collection and ops 182 if err := pipe.Incr(ctx, collection).Err(); err != nil { 183 log.Printf("failed incrementing collection: %v\n", err) 184 } 185 186 if err := pipe.Incr(ctx, `dev.edavis.muninsky.ops`).Err(); err != nil { 187 log.Printf("failed incrementing ops: %v\n", err) 188 } 189 190 // add one to the count, every 500 ops execute the piepline 191 eventCount += 1 192 if eventCount%2500 == 0 { 193 if _, err := pipe.Exec(ctx); err != nil { 194 log.Printf("failed to exec pipe\n") 195 } 196 log.Printf("queue size: %d\n", queue.Size()) 197 } 198 } 199} 200 201func main() { 202 ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM) 203 defer stop() 204 205 conn, _, err := websocket.DefaultDialer.DialContext(ctx, JetstreamUrl, nil) 206 if err != nil { 207 log.Fatalf("failed to open websocket: %v\n", err) 208 } 209 defer func() { 210 if err := conn.Close(); err != nil { 211 log.Printf("failed to close websocket: %v\n", err) 212 } 213 log.Printf("websocket closed\n") 214 }() 215 216 queue := NewQueue(100_000) 217 go handler(ctx, queue) 218 219 log.Printf("starting up\n") 220 go func() { 221 for { 222 var event jetstream.Event 223 err := conn.ReadJSON(&event) 224 if err != nil { 225 log.Printf("ReadJSON error: %v\n", err) 226 stop() 227 break 228 } 229 queue.Enqueue(event) 230 } 231 }() 232 233 <-ctx.Done() 234 log.Printf("shutting down\n") 235}