this repo has no description
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "log"
7 "os"
8 "os/signal"
9 "strings"
10 "syscall"
11
12 appbsky "github.com/bluesky-social/indigo/api/bsky"
13 jetstream "github.com/bluesky-social/jetstream/pkg/models"
14 "github.com/gorilla/websocket"
15 "github.com/redis/go-redis/v9"
16)
17
18const JetstreamUrl = `wss://jetstream1.us-west.bsky.network/subscribe`
19
20var AppBskyAllowlist = map[string]bool{
21 "app.bsky.actor.profile": true,
22 "app.bsky.feed.generator": true,
23 "app.bsky.feed.like": true,
24 "app.bsky.feed.post": true,
25 "app.bsky.feed.postgate": true,
26 "app.bsky.feed.repost": true,
27 "app.bsky.feed.threadgate": true,
28 "app.bsky.graph.block": true,
29 "app.bsky.graph.follow": true,
30 "app.bsky.graph.list": true,
31 "app.bsky.graph.listblock": true,
32 "app.bsky.graph.listitem": true,
33 "app.bsky.graph.starterpack": true,
34 "app.bsky.labeler.service": true,
35 "chat.bsky.actor.declaration": true,
36}
37
38var AtprotoAllowlist = map[string]bool{
39 "social.psky": true,
40 "blue.zio.atfile": true,
41 "com.shinolabs.pinksea": true,
42 "com.whtwnd": true,
43 "events.smokesignal": true,
44 "fyi.unravel": true,
45 "xyz.statusphere": true,
46}
47
48func trackedRecordType(collection string) bool {
49 for k, _ := range AppBskyAllowlist {
50 if collection == k {
51 return true
52 }
53 }
54 for k, _ := range AtprotoAllowlist {
55 if strings.HasPrefix(collection, k) {
56 return true
57 }
58 }
59 return false
60}
61
62func handler(ctx context.Context, events <-chan jetstream.Event) {
63 rdb := redis.NewClient(&redis.Options{
64 Addr: "localhost:6379",
65 Password: "",
66 DB: 0,
67 })
68 pipe := rdb.Pipeline()
69 var eventCount int
70
71eventLoop:
72 for event := range events {
73 select {
74 case <-ctx.Done():
75 break eventLoop
76 default:
77 }
78
79 if event.Kind != jetstream.EventKindCommit {
80 continue
81 }
82 if event.Commit.Operation != jetstream.CommitOperationCreate {
83 continue
84 }
85
86 commit := *event.Commit
87 collection := commit.Collection
88
89 // if collection doesn't start with either allowlist, continue
90 if !trackedRecordType(collection) {
91 continue
92 }
93
94 // if collection starts with one of the Atproto allowlist keys, incr
95 for k, _ := range AtprotoAllowlist {
96 if strings.HasPrefix(collection, k) {
97 ckey := strings.ReplaceAll(k, ".", "_")
98 if err := pipe.Incr(ctx, "dev.edavis.atproto.collection."+ckey); err != nil {
99 log.Printf("failed incrementing an atproto collection: %v\n", err)
100 }
101 }
102 }
103
104 // if a post with an embed, incr that $embed type
105 if collection == "app.bsky.feed.post" {
106 var post appbsky.FeedPost
107 if err := json.Unmarshal(commit.Record, &post); err != nil {
108 log.Printf("error parsing appbsky.FeedPost: %v\n", err)
109 }
110 if post.Embed != nil {
111 var ekey string
112 switch {
113 case post.Embed.EmbedImages != nil:
114 ekey = post.Embed.EmbedImages.LexiconTypeID
115 case post.Embed.EmbedVideo != nil:
116 ekey = post.Embed.EmbedVideo.LexiconTypeID
117 case post.Embed.EmbedExternal != nil:
118 ekey = post.Embed.EmbedExternal.LexiconTypeID
119 case post.Embed.EmbedRecord != nil:
120 ekey = post.Embed.EmbedRecord.LexiconTypeID
121 case post.Embed.EmbedRecordWithMedia != nil:
122 ekey = post.Embed.EmbedRecordWithMedia.LexiconTypeID
123 }
124 if ekey == "" {
125 continue
126 }
127 if err := pipe.Incr(ctx, "app.bsky.feed.post:embed:"+ekey).Err(); err != nil {
128 log.Printf("failed incrementing embed key: %v\n", err)
129 }
130 }
131 }
132
133 // incr the collection and ops
134 if err := pipe.Incr(ctx, collection).Err(); err != nil {
135 log.Printf("failed incrementing collection: %v\n", err)
136 }
137
138 if err := pipe.Incr(ctx, `dev.edavis.muninsky.ops`).Err(); err != nil {
139 log.Printf("failed incrementing ops: %v\n", err)
140 }
141
142 // add one to the count, every 500 ops execute the piepline
143 eventCount += 1
144 if eventCount%500 == 0 {
145 if _, err := pipe.Exec(ctx); err != nil {
146 log.Printf("failed to exec pipe\n")
147 }
148 }
149 }
150}
151
152func main() {
153 ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
154 defer stop()
155
156 conn, _, err := websocket.DefaultDialer.DialContext(ctx, JetstreamUrl, nil)
157 if err != nil {
158 log.Fatalf("failed to open websocket: %v\n", err)
159 }
160 defer func() {
161 if err := conn.Close(); err != nil {
162 log.Printf("failed to close websocket: %v\n", err)
163 }
164 log.Printf("websocket closed\n")
165 }()
166
167 jetstreamEvents := make(chan jetstream.Event)
168 go handler(ctx, jetstreamEvents)
169
170 log.Printf("starting up\n")
171 var event jetstream.Event
172 go func() {
173 for {
174 event = jetstream.Event{}
175 err := conn.ReadJSON(&event)
176 if err != nil {
177 log.Printf("ReadJSON error: %v\n", err)
178 stop()
179 break
180 } else {
181 jetstreamEvents <- event
182 }
183 }
184 }()
185
186 <-ctx.Done()
187 log.Printf("shutting down\n")
188}