this repo has no description
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "log"
7 "os"
8 "os/signal"
9 "strings"
10 "sync"
11 "syscall"
12 "time"
13
14 appbsky "github.com/bluesky-social/indigo/api/bsky"
15 jetstream "github.com/bluesky-social/jetstream/pkg/models"
16 "github.com/gorilla/websocket"
17 "github.com/redis/go-redis/v9"
18)
19
20type Queue struct {
21 lk sync.Mutex
22 events []jetstream.Event
23}
24
25func NewQueue(capacity int) *Queue {
26 return &Queue{
27 events: make([]jetstream.Event, 0, capacity),
28 }
29}
30
31func (q *Queue) Enqueue(event jetstream.Event) {
32 q.lk.Lock()
33 defer q.lk.Unlock()
34
35 q.events = append(q.events, event)
36}
37
38func (q *Queue) Dequeue() (jetstream.Event, bool) {
39 q.lk.Lock()
40 defer q.lk.Unlock()
41
42 var event jetstream.Event
43
44 if len(q.events) == 0 {
45 return event, false
46 }
47
48 event = q.events[0]
49 q.events = q.events[1:]
50 return event, true
51}
52
53func (q *Queue) Size() int {
54 q.lk.Lock()
55 defer q.lk.Unlock()
56
57 return len(q.events)
58}
59
60const JetstreamUrl = `wss://jetstream1.us-west.bsky.network/subscribe`
61
62var AppBskyAllowlist = map[string]bool{
63 "app.bsky.actor.profile": true,
64 "app.bsky.feed.generator": true,
65 "app.bsky.feed.like": true,
66 "app.bsky.feed.post": true,
67 "app.bsky.feed.postgate": true,
68 "app.bsky.feed.repost": true,
69 "app.bsky.feed.threadgate": true,
70 "app.bsky.graph.block": true,
71 "app.bsky.graph.follow": true,
72 "app.bsky.graph.list": true,
73 "app.bsky.graph.listblock": true,
74 "app.bsky.graph.listitem": true,
75 "app.bsky.graph.starterpack": true,
76 "app.bsky.labeler.service": true,
77 "chat.bsky.actor.declaration": true,
78}
79
80var AtprotoAllowlist = map[string]bool{
81 "social.psky": true,
82 "blue.zio.atfile": true,
83 "com.shinolabs.pinksea": true,
84 "com.whtwnd": true,
85 "events.smokesignal": true,
86 "fyi.unravel": true,
87 "xyz.statusphere": true,
88}
89
90func trackedRecordType(collection string) bool {
91 for k, _ := range AppBskyAllowlist {
92 if collection == k {
93 return true
94 }
95 }
96 for k, _ := range AtprotoAllowlist {
97 if strings.HasPrefix(collection, k) {
98 return true
99 }
100 }
101 return false
102}
103
104func handler(ctx context.Context, queue *Queue) {
105 rdb := redis.NewClient(&redis.Options{
106 Addr: "localhost:6379",
107 Password: "",
108 DB: 0,
109 })
110 pipe := rdb.Pipeline()
111 var eventCount int
112
113eventLoop:
114 for {
115 select {
116 case <-ctx.Done():
117 break eventLoop
118 default:
119 }
120
121 event, ok := queue.Dequeue()
122 if !ok {
123 time.Sleep(100 * time.Millisecond)
124 continue
125 }
126
127 if event.Kind != jetstream.EventKindCommit {
128 continue
129 }
130 if event.Commit.Operation != jetstream.CommitOperationCreate {
131 continue
132 }
133
134 commit := *event.Commit
135 collection := commit.Collection
136
137 // if collection doesn't start with either allowlist, continue
138 if !trackedRecordType(collection) {
139 continue
140 }
141
142 // if collection starts with one of the Atproto allowlist keys, incr
143 for k, _ := range AtprotoAllowlist {
144 if strings.HasPrefix(collection, k) {
145 ckey := strings.ReplaceAll(k, ".", "_")
146 if err := pipe.Incr(ctx, "dev.edavis.atproto.collection."+ckey).Err(); err != nil {
147 log.Printf("failed incrementing an atproto collection: %v\n", err)
148 }
149 }
150 }
151
152 // if a post with an embed, incr that $embed type
153 if collection == "app.bsky.feed.post" {
154 var post appbsky.FeedPost
155 if err := json.Unmarshal(commit.Record, &post); err != nil {
156 log.Printf("error parsing appbsky.FeedPost: %v\n", err)
157 }
158 if post.Embed != nil {
159 var ekey string
160 switch {
161 case post.Embed.EmbedImages != nil:
162 ekey = post.Embed.EmbedImages.LexiconTypeID
163 case post.Embed.EmbedVideo != nil:
164 ekey = post.Embed.EmbedVideo.LexiconTypeID
165 case post.Embed.EmbedExternal != nil:
166 ekey = post.Embed.EmbedExternal.LexiconTypeID
167 case post.Embed.EmbedRecord != nil:
168 ekey = post.Embed.EmbedRecord.LexiconTypeID
169 case post.Embed.EmbedRecordWithMedia != nil:
170 ekey = post.Embed.EmbedRecordWithMedia.LexiconTypeID
171 }
172 if ekey == "" {
173 continue
174 }
175 if err := pipe.Incr(ctx, "app.bsky.feed.post:embed:"+ekey).Err(); err != nil {
176 log.Printf("failed incrementing embed key: %v\n", err)
177 }
178 }
179 }
180
181 // incr the collection and ops
182 if err := pipe.Incr(ctx, collection).Err(); err != nil {
183 log.Printf("failed incrementing collection: %v\n", err)
184 }
185
186 if err := pipe.Incr(ctx, `dev.edavis.muninsky.ops`).Err(); err != nil {
187 log.Printf("failed incrementing ops: %v\n", err)
188 }
189
190 // add one to the count, every 500 ops execute the piepline
191 eventCount += 1
192 if eventCount%2500 == 0 {
193 if _, err := pipe.Exec(ctx); err != nil {
194 log.Printf("failed to exec pipe\n")
195 }
196 log.Printf("queue size: %d\n", queue.Size())
197 }
198 }
199}
200
201func main() {
202 ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
203 defer stop()
204
205 conn, _, err := websocket.DefaultDialer.DialContext(ctx, JetstreamUrl, nil)
206 if err != nil {
207 log.Fatalf("failed to open websocket: %v\n", err)
208 }
209 defer func() {
210 if err := conn.Close(); err != nil {
211 log.Printf("failed to close websocket: %v\n", err)
212 }
213 log.Printf("websocket closed\n")
214 }()
215
216 queue := NewQueue(100_000)
217 go handler(ctx, queue)
218
219 log.Printf("starting up\n")
220 go func() {
221 for {
222 var event jetstream.Event
223 err := conn.ReadJSON(&event)
224 if err != nil {
225 log.Printf("ReadJSON error: %v\n", err)
226 stop()
227 break
228 }
229 queue.Enqueue(event)
230 }
231 }()
232
233 <-ctx.Done()
234 log.Printf("shutting down\n")
235}